[ https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500961#comment-16500961 ]
ASF GitHub Bot commented on KAFKA-6813: --------------------------------------- guozhangwang closed pull request #5075: KAFKA-6813: return to double-counting for count topology names URL: https://github.com/apache/kafka/pull/5075 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index ead8a7663ab..517104da323 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -224,9 +224,9 @@ Objects.requireNonNull(materialized, "materialized can't be null"); final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); - return internalStreamsBuilder.table(topic, - consumedInternal, - new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); + return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal); } /** @@ -273,12 +273,10 @@ Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); - return internalStreamsBuilder.table(topic, - consumedInternal, - new MaterializedInternal<>( - Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()), - internalStreamsBuilder, - topic + "-")); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde())); + materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); + return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal); } /** @@ -302,8 +300,9 @@ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); + return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde())), @@ -331,14 +330,11 @@ Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized = - new MaterializedInternal<>( - Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()), - internalStreamsBuilder, - topic + "-"); - + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde())); + materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); - return internalStreamsBuilder.globalTable(topic, consumedInternal, materialized); + return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal); } /** @@ -402,9 +398,10 @@ final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); // always use the serdes from consumed materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); - return internalStreamsBuilder.globalTable(topic, - consumedInternal, - new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); + + return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal); } /** @@ -436,8 +433,9 @@ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = - new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); + return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde())), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 3a9f9197ac7..74f930ee357 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -74,8 +74,10 @@ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); + + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -97,8 +99,9 @@ Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -117,14 +120,26 @@ @Override public KTable<K, Long> count() { - return count(Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>with(keySerde, Serdes.Long())); + return doCount(Materialized.with(keySerde, Serdes.Long())); } @Override public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + + // TODO: remove this when we do a topology-incompatible release + // we used to burn a topology name here, so we have to keep doing it for compatibility + if (new MaterializedInternal<>(materialized).storeName() == null) { + builder.newStoreName(AGGREGATE_NAME); + } + + return doCount(materialized); + } + + private KTable<K, Long> doCount(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) { + final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -133,9 +148,9 @@ } return doAggregate( - new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), - AGGREGATE_NAME, - materializedInternal); + new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + AGGREGATE_NAME, + materializedInternal); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index db119f30fd5..49f258bf503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -128,8 +128,9 @@ private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSuppl Objects.requireNonNull(adder, "adder can't be null"); Objects.requireNonNull(subtractor, "subtractor can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -150,8 +151,9 @@ private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSuppl @Override public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) { - final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -182,8 +184,9 @@ private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSuppl Objects.requireNonNull(subtractor, "subtractor can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = - new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index bcd31bba16e..21c15058bb9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -154,7 +154,10 @@ public String queryableStoreName() { final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), false); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME); + + return doFilter(predicate, materializedInternal, false); } @Override @@ -168,7 +171,10 @@ public String queryableStoreName() { final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), true); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME); + + return doFilter(predicate, materializedInternal, true); } private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, @@ -210,7 +216,10 @@ public String queryableStoreName() { Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doMapValues(withKey(mapper), new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME)); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, MAPVALUES_NAME); + + return doMapValues(withKey(mapper), materializedInternal); } @Override @@ -219,7 +228,10 @@ public String queryableStoreName() { Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doMapValues(mapper, new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME)); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, MAPVALUES_NAME); + + return doMapValues(mapper, materializedInternal); } @Override @@ -233,8 +245,10 @@ public String queryableStoreName() { final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, final String... stateStoreNames) { Objects.requireNonNull(materialized, "materialized can't be null"); - return doTransformValues(transformerSupplier, - new MaterializedInternal<>(materialized, builder, TRANSFORMVALUES_NAME), stateStoreNames); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, TRANSFORMVALUES_NAME); + + return doTransformValues(transformerSupplier, materializedInternal, stateStoreNames); } private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, @@ -304,7 +318,10 @@ public V apply(final K key, final Change<V> change) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), false, false); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); + + return doJoin(other, joiner, materializedInternal, false, false); } @Override @@ -317,7 +334,10 @@ public V apply(final K key, final Change<V> change) { public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { - return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, true); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); + + return doJoin(other, joiner, materializedInternal, true, true); } @Override @@ -330,11 +350,9 @@ public V apply(final K key, final Change<V> change) { public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { - return doJoin(other, - joiner, - new MaterializedInternal<>(materialized, builder, MERGE_NAME), - true, - false); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); + return doJoin(other, joiner, materializedInternal, true, false); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index c933b8687f5..5361e48c1d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -25,18 +25,17 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> { - private final boolean queryable; + private final boolean queriable; - - public MaterializedInternal(final Materialized<K, V, S> materialized, - final InternalNameProvider nameProvider, - final String generatedStorePrefix) { + public MaterializedInternal(final Materialized<K, V, S> materialized) { super(materialized); + queriable = storeName() != null; + } + + public void generateStoreNameIfNeeded(final InternalNameProvider nameProvider, + final String generatedStorePrefix) { if (storeName() == null) { - queryable = false; storeName = nameProvider.newStoreName(generatedStorePrefix); - } else { - queryable = true; } } @@ -63,7 +62,7 @@ public boolean loggingEnabled() { return loggingEnabled; } - public Map<String, String> logConfig() { + Map<String, String> logConfig() { return topicConfig; } @@ -72,6 +71,6 @@ boolean cachingEnabled() { } boolean isQueryable() { - return queryable; + return queriable; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index c29c6566c2f..b3cbacd57f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -68,15 +68,26 @@ public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) { @Override public KTable<Windowed<K>, Long> count() { - return count(Materialized.<K, Long, SessionStore<Bytes, byte[]>>with(keySerde, Serdes.Long())); + return doCount(Materialized.with(keySerde, Serdes.Long())); } - @SuppressWarnings("unchecked") @Override public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + + // TODO: remove this when we do a topology-incompatible release + // we used to burn a topology name here, so we have to keep doing it for compatibility + if (new MaterializedInternal<>(materialized).storeName() == null) { + builder.newStoreName(AGGREGATE_NAME); + } + + return doCount(materialized); + } + + @SuppressWarnings("unchecked") + private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) { + final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -85,10 +96,10 @@ public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) { } return (KTable<Windowed<K>, Long>) aggregateBuilder.build( - new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, countMerger), - AGGREGATE_NAME, - materialize(materializedInternal), - materializedInternal.isQueryable()); + new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, countMerger), + AGGREGATE_NAME, + materialize(materializedInternal), + materializedInternal.isQueryable()); } @Override @@ -103,8 +114,8 @@ public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer); - final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); + final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME); if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -136,8 +147,9 @@ public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) { Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index d1e5a175847..4f5301b53b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -24,9 +24,9 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -63,15 +63,27 @@ @Override public KTable<Windowed<K>, Long> count() { - return count(Materialized.<K, Long, WindowStore<Bytes, byte[]>>with(keySerde, Serdes.Long())); + return doCount(Materialized.with(keySerde, Serdes.Long())); } - @SuppressWarnings("unchecked") @Override public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + + // TODO: remove this when we do a topology-incompatible release + // we used to burn a topology name here, so we have to keep doing it for compatibility + if (new MaterializedInternal<>(materialized).storeName() == null) { + builder.newStoreName(AGGREGATE_NAME); + } + + return doCount(materialized); + } + + @SuppressWarnings("unchecked") + private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) { + final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -80,9 +92,9 @@ } return (KTable<Windowed<K>, Long>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), - AGGREGATE_NAME, - materialize(materializedInternal), - materializedInternal.isQueryable()); + AGGREGATE_NAME, + materialize(materializedInternal), + materializedInternal.isQueryable()); } @@ -100,8 +112,8 @@ Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME); if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -122,8 +134,9 @@ Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); + final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index a845be3569f..f8f9c7d68a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -16,8 +16,12 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -40,6 +44,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class TopologyTest { @@ -76,12 +81,7 @@ public void shouldNotAllowZeroTopicsWhenAddingSource() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullNameWhenAddingProcessor() { - topology.addProcessor(null, new ProcessorSupplier() { - @Override - public Processor get() { - return new MockProcessorSupplier().get(); - } - }); + topology.addProcessor(null, () -> new MockProcessorSupplier().get()); } @Test(expected = NullPointerException.class) @@ -130,7 +130,7 @@ public void shouldNotAllowToAddSourcesWithSameName() { try { topology.addSource("source", "topic-2"); fail("Should throw TopologyException for duplicate source name"); - } catch (TopologyException expected) { } + } catch (final TopologyException expected) { } } @Test @@ -139,7 +139,7 @@ public void shouldNotAllowToAddTopicTwice() { try { topology.addSource("source-2", "topic-1"); fail("Should throw TopologyException for already used topic"); - } catch (TopologyException expected) { } + } catch (final TopologyException expected) { } } @Test @@ -167,7 +167,7 @@ public void shouldNotAllowToAddProcessorWithSameName() { try { topology.addProcessor("processor", new MockProcessorSupplier(), "source"); fail("Should throw TopologyException for duplicate processor name"); - } catch (TopologyException expected) { } + } catch (final TopologyException expected) { } } @Test(expected = TopologyException.class) @@ -187,7 +187,7 @@ public void shouldNotAllowToAddSinkWithSameName() { try { topology.addSink("sink", "topic-3", "source"); fail("Should throw TopologyException for duplicate sink name"); - } catch (TopologyException expected) { } + } catch (final TopologyException expected) { } } @Test(expected = TopologyException.class) @@ -257,7 +257,7 @@ public void shouldNotAllowToAddStoreWithSameName() { } @Test - public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { + public void shouldThrowOnUnassignedStateStoreAccess() { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; final String badNodeName = "badGuy"; @@ -283,7 +283,7 @@ public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { } catch (final StreamsException e) { final String error = e.toString(); final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; - + assertThat(error, equalTo(expectedMessage)); } } @@ -295,12 +295,12 @@ public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { public Processor get() { return new Processor() { @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { context.getStateStore(STORE_NAME); } @Override - public void process(Object key, Object value) { } + public void process(final Object key, final Object value) { } @Override public void close() { } @@ -324,7 +324,7 @@ public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { @Test public void shouldDescribeEmptyTopology() { - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -333,9 +333,9 @@ public void singleSourceShouldHaveSingleSubtopology() { expectedDescription.addSubtopology( new InternalTopologyBuilder.Subtopology(0, - Collections.<TopologyDescription.Node>singleton(expectedSourceNode))); + Collections.singleton(expectedSourceNode))); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -344,9 +344,9 @@ public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() { expectedDescription.addSubtopology( new InternalTopologyBuilder.Subtopology(0, - Collections.<TopologyDescription.Node>singleton(expectedSourceNode))); + Collections.singleton(expectedSourceNode))); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -355,9 +355,9 @@ public void singleSourcePatternShouldHaveSingleSubtopology() { expectedDescription.addSubtopology( new InternalTopologyBuilder.Subtopology(0, - Collections.<TopologyDescription.Node>singleton(expectedSourceNode))); + Collections.singleton(expectedSourceNode))); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -365,19 +365,19 @@ public void multipleSourcesShouldHaveDistinctSubtopologies() { final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1"); expectedDescription.addSubtopology( new InternalTopologyBuilder.Subtopology(0, - Collections.<TopologyDescription.Node>singleton(expectedSourceNode1))); + Collections.singleton(expectedSourceNode1))); final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2"); expectedDescription.addSubtopology( new InternalTopologyBuilder.Subtopology(1, - Collections.<TopologyDescription.Node>singleton(expectedSourceNode2))); + Collections.singleton(expectedSourceNode2))); final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3"); expectedDescription.addSubtopology( new InternalTopologyBuilder.Subtopology(2, - Collections.<TopologyDescription.Node>singleton(expectedSourceNode3))); + Collections.singleton(expectedSourceNode3))); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -390,7 +390,7 @@ public void sourceAndProcessorShouldHaveSingleSubtopology() { allNodes.add(expectedProcessorNode); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -405,7 +405,7 @@ public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() { allNodes.add(expectedProcessorNode); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -421,7 +421,7 @@ public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() { allNodes.add(expectedProcessorNode); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -436,7 +436,7 @@ public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() { allNodes.add(expectedProcessorNode2); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -451,7 +451,7 @@ public void processorWithMultipleSourcesShouldHaveSingleSubtopology() { allNodes.add(expectedProcessorNode); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -480,7 +480,7 @@ public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() { allNodes3.add(expectedProcessorNode3); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -509,7 +509,7 @@ public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() { allNodes3.add(expectedSinkNode3); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -540,7 +540,7 @@ public void processorsWithSameSinkShouldHaveSameSubtopology() { allNodes.add(expectedSinkNode); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test @@ -570,30 +570,303 @@ public void processorsWithSharedStateShouldHaveSameSubtopology() { allNodes.add(expectedProcessorNode3); expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test public void shouldDescribeGlobalStoreTopology() { addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); } @Test public void shouldDescribeMultipleGlobalStoreTopology() { addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0); addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1); - assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); + assertThat(topology.describe(), equalTo(expectedDescription)); + } + + @Test + public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .count(); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000002\n" + + " Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void kGroupedStreamNamedMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .count(Materialized.as("count-store")); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000001\n" + + " Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .count(Materialized.with(null, Serdes.Long())); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void timeWindowZeroArgCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .windowedBy(TimeWindows.of(1)) + .count(); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000002\n" + + " Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void timeWindowNamedMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .windowedBy(TimeWindows.of(1)) + .count(Materialized.as("count-store")); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000001\n" + + " Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .windowedBy(TimeWindows.of(1)) + .count(Materialized.with(null, Serdes.Long())); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void sessionWindowZeroArgCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .windowedBy(SessionWindows.with(1)) + .count(); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000002\n" + + " Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void sessionWindowNamedMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .windowedBy(SessionWindows.with(1)) + .count(Materialized.as("count-store")); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000001\n" + + " Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void sessionWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input-topic") + .groupByKey() + .windowedBy(SessionWindows.with(1)) + .count(Materialized.with(null, Serdes.Long())); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString() + ); + } + + @Test + public void tableZeroArgCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.table("input-topic") + .groupBy((key, value) -> null) + .count(); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" + + " --> KTABLE-SOURCE-0000000002\n" + + " Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" + + " --> KTABLE-SELECT-0000000003\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KTABLE-SELECT-0000000003 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KTABLE-SOURCE-0000000002\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" + + " <-- KTABLE-SELECT-0000000003\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" + + " --> KTABLE-AGGREGATE-0000000007\n" + + " Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + + "\n", + describe.toString() + ); + } + + @Test + public void tableNamedMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.table("input-topic") + .groupBy((key, value) -> null) + .count(Materialized.as("count-store")); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" + + " --> KTABLE-SOURCE-0000000002\n" + + " Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" + + " --> KTABLE-SELECT-0000000003\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KTABLE-SELECT-0000000003 (stores: [])\n" + + " --> KSTREAM-SINK-0000000004\n" + + " <-- KTABLE-SOURCE-0000000002\n" + + " Sink: KSTREAM-SINK-0000000004 (topic: count-store-repartition)\n" + + " <-- KTABLE-SELECT-0000000003\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [count-store-repartition])\n" + + " --> KTABLE-AGGREGATE-0000000006\n" + + " Processor: KTABLE-AGGREGATE-0000000006 (stores: [count-store])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000005\n" + + "\n", + describe.toString() + ); + } + + @Test + public void tableAnonymousMaterializedCountShouldPreserveTopologyStructure() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.table("input-topic") + .groupBy((key, value) -> null) + .count(Materialized.with(null, Serdes.Long())); + final TopologyDescription describe = builder.build().describe(); + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" + + " --> KTABLE-SOURCE-0000000002\n" + + " Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" + + " --> KTABLE-SELECT-0000000003\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KTABLE-SELECT-0000000003 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KTABLE-SOURCE-0000000002\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" + + " <-- KTABLE-SELECT-0000000003\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" + + " --> KTABLE-AGGREGATE-0000000007\n" + + " Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + + "\n", + describe.toString() + ); } private TopologyDescription.Source addSource(final String sourceName, final String... sourceTopic) { topology.addSource(null, sourceName, null, null, null, sourceTopic); - String allSourceTopics = sourceTopic[0]; + final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]); for (int i = 1; i < sourceTopic.length; ++i) { - allSourceTopics += ", " + sourceTopic[i]; + allSourceTopics.append(", ").append(sourceTopic[i]); } - return new InternalTopologyBuilder.Source(sourceName, allSourceTopics); + return new InternalTopologyBuilder.Source(sourceName, allSourceTopics.toString()); } private TopologyDescription.Source addSource(final String sourceName, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 79bf81e6bb2..63432ffc439 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockValueJoiner; -import org.junit.Before; import org.junit.Test; import java.util.Collections; @@ -58,12 +57,11 @@ private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder()); private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(); private final String storePrefix = "prefix-"; - private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), builder, storePrefix); + private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store")); - @Before - public void setUp() { + { builder.internalTopologyBuilder.setApplicationId(APP_ID); + materialized.generateStoreNameIfNeeded(builder, storePrefix); } @Test @@ -127,12 +125,10 @@ public boolean test(final String key, final String value) { @Test public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() { - KTable table1 = builder.table("topic2", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null), - builder, - storePrefix)); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.with(null, null)); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + final KTable table1 = builder.table("topic2", consumed, materializedInternal); final ProcessorTopology topology = builder.internalTopologyBuilder.build(null); @@ -147,38 +143,33 @@ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() { @Test public void shouldBuildGlobalTableWithNonQueryableStoreName() { - final GlobalKTable<String, String> table1 = builder.globalTable( - "topic2", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null), - builder, - storePrefix)); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.with(null, null)); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + + final GlobalKTable<String, String> table1 = builder.globalTable("topic2", consumed, materializedInternal); assertNull(table1.queryableStoreName()); } @Test public void shouldBuildGlobalTableWithQueryaIbleStoreName() { - final GlobalKTable<String, String> table1 = builder.globalTable( - "topic2", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"), - builder, - storePrefix)); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as("globalTable")); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + final GlobalKTable<String, String> table1 = builder.globalTable("topic2", consumed, materializedInternal); assertEquals("globalTable", table1.queryableStoreName()); } @Test public void shouldBuildSimpleGlobalTableTopology() { + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as("globalTable")); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); builder.globalTable("table", consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"), - builder, - storePrefix)); + materializedInternal); final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology(); final List<StateStore> stateStores = topology.globalStateStores(); @@ -199,15 +190,18 @@ private void doBuildGlobalTopologyWithAllGlobalTables() { @Test public void shouldBuildGlobalTopologyWithAllGlobalTables() { - builder.globalTable("table", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"), builder, storePrefix)); - builder.globalTable("table2", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"), builder, storePrefix)); - + { + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as("global1")); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + builder.globalTable("table", consumed, materializedInternal); + } + { + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as("global2")); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + builder.globalTable("table2", consumed, materializedInternal); + } doBuildGlobalTopologyWithAllGlobalTables(); } @@ -216,25 +210,22 @@ public void shouldAddGlobalTablesToEachGroup() { final String one = "globalTable"; final String two = "globalTable2"; - final GlobalKTable<String, String> globalTable = builder.globalTable("table", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one), builder, storePrefix)); - final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", - consumed, - new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two), builder, storePrefix)); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as(one)); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, materializedInternal); - final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), builder, storePrefix); - builder.table("not-global", consumed, materialized); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal2 = + new MaterializedInternal<>(Materialized.as(two)); + materializedInternal2.generateStoreNameIfNeeded(builder, storePrefix); + final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, materializedInternal2); - final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() { - @Override - public String apply(final String key, final String value) { - return value; - } - }; + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternalNotGlobal = + new MaterializedInternal<>(Materialized.as("not-global")); + materializedInternalNotGlobal.generateStoreNameIfNeeded(builder, storePrefix); + builder.table("not-global", consumed, materializedInternalNotGlobal); + + final KeyValueMapper<String, String, String> kvMapper = (key, value) -> value; final KStream<String, String> stream = builder.stream(Collections.singleton("t1"), consumed); stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER); @@ -260,9 +251,10 @@ public String apply(final String key, final String value) { public void shouldMapStateStoresToCorrectSourceTopics() { final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed); - final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), builder, storePrefix); - final KTable<String, String> table = builder.table("table-topic", consumed, materialized); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as("table-store")); + materializedInternal.generateStoreNameIfNeeded(builder, storePrefix); + final KTable<String, String> table = builder.table("table-topic", consumed, materializedInternal); assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store")); final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java index 5fd76f3acfe..1a83ef13152 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java @@ -49,8 +49,9 @@ public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() { EasyMock.replay(nameProvider); - final MaterializedInternal<Object, Object, StateStore> materialized - = new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix); + final MaterializedInternal<Object, Object, StateStore> materialized = + new MaterializedInternal<>(Materialized.with(null, null)); + materialized.generateStoreNameIfNeeded(nameProvider, prefix); assertThat(materialized.storeName(), equalTo(generatedName)); EasyMock.verify(nameProvider); @@ -59,8 +60,9 @@ public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() { @Test public void shouldUseProvidedStoreNameWhenSet() { final String storeName = "store-name"; - final MaterializedInternal<Object, Object, StateStore> materialized - = new MaterializedInternal<>(Materialized.as(storeName), nameProvider, prefix); + final MaterializedInternal<Object, Object, StateStore> materialized = + new MaterializedInternal<>(Materialized.as(storeName)); + materialized.generateStoreNameIfNeeded(nameProvider, prefix); assertThat(materialized.storeName(), equalTo(storeName)); } @@ -69,8 +71,9 @@ public void shouldUseStoreNameOfSupplierWhenProvided() { final String storeName = "other-store-name"; EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes(); EasyMock.replay(supplier); - final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix); + final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = + new MaterializedInternal<>(Materialized.as(supplier)); + materialized.generateStoreNameIfNeeded(nameProvider, prefix); assertThat(materialized.storeName(), equalTo(storeName)); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index c71f4690c4b..f3e9299cb50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -73,19 +73,21 @@ @Before public void before() { final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( - Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>with(null, null), + Materialized.with(null, null)); + materialized.generateStoreNameIfNeeded( new InternalNameProvider() { @Override - public String newProcessorName(String prefix) { + public String newProcessorName(final String prefix) { return "processorName"; } @Override - public String newStoreName(String prefix) { + public String newStoreName(final String prefix) { return GLOBAL_STORE_NAME; } }, - "store-"); + "store-" + ); builder.addGlobalStore( (StoreBuilder) new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java index 9ba86ac7e14..fc243c61f8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -53,10 +53,10 @@ @Test public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() { - final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"), - nameProvider, - storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = + new MaterializedInternal<>(Materialized.as("store")); + materialized.generateStoreNameIfNeeded(nameProvider, storePrefix); + final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -69,9 +69,10 @@ public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnable @Test public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() { - final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") - .withCachingDisabled(), nameProvider, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled() + ); + materialized.generateStoreNameIfNeeded(nameProvider, storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -81,9 +82,11 @@ public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() { @Test public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() { - final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") - .withLoggingDisabled(), nameProvider, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") + .withLoggingDisabled() + ); + materialized.generateStoreNameIfNeeded(nameProvider, storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -94,10 +97,11 @@ public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() { @Test public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() { - final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") .withCachingDisabled() - .withLoggingDisabled(), nameProvider, storePrefix); + .withLoggingDisabled()); + materialized.generateStoreNameIfNeeded(nameProvider, storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -114,8 +118,9 @@ public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() { EasyMock.expect(supplier.get()).andReturn(store); EasyMock.replay(supplier); - final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier), nameProvider, storePrefix); + final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized = + new MaterializedInternal<>(Materialized.as(supplier)); + materialized.generateStoreNameIfNeeded(nameProvider, storePrefix); final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize(); final KeyValueStore<String, Integer> built = builder.build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index f3dce521998..936c67b8ef8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -829,7 +829,9 @@ public void shouldUpdateStandbyTask() { final TopicPartition partition2 = t2p1; internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1)); - internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, "")); + final MaterializedInternal materialized = new MaterializedInternal(Materialized.as(storeName2)); + materialized.generateStoreNameIfNeeded(internalStreamsBuilder, ""); + internalStreamsBuilder.table(topic2, new ConsumedInternal(), materialized); final StreamThread thread = createStreamThread(clientId, config, false); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated APIs from KIP-120 and KIP-182 in Streams > ---------------------------------------------------------- > > Key: KAFKA-6813 > URL: https://issues.apache.org/jira/browse/KAFKA-6813 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Fix For: 2.0.0 > > > As we move on to the next major release 2.0, we can consider removing the > deprecated APIs from KIP-120 and KIP-182. -- This message was sent by Atlassian JIRA (v7.6.3#76005)