mjsax commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r915376733
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final
StoreBuilder<?> storeBuilder,
return this;
}
+ /**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
Review Comment:
nit: do we need a `<p>` tag here?
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final
StoreBuilder<?> storeBuilder,
return this;
}
+ /**
+ * Adds a Read Only {@link StateStore} to the topology.
Review Comment:
nit: `read-only`
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final
StoreBuilder<?> storeBuilder,
return this;
}
+ /**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
Review Comment:
Proposal:
```
A read-only state store uses its input topic for fault-tolerance. Thus, in
contrast to regular state stores, it must never create an internal changelog
topic. Therefore, the input topic should be configured with log compaction.
```
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final
StoreBuilder<?> storeBuilder,
return this;
}
+ /**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
+ * <p>
+ * A {@link SourceNode} will be added to consume the data arriving from
the partitions of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an {@link
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
+ *
+ * @param storeBuilder user defined key value store builder
Review Comment:
If we are limited to kv-store, should we change the type to
`StoreBuilder<KeyValueStore>` (or similar)?
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final
StoreBuilder<?> storeBuilder,
return this;
}
+ /**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
+ * <p>
+ * A {@link SourceNode} will be added to consume the data arriving from
the partitions of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an {@link
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
+ *
+ * @param storeBuilder user defined key value store builder
+ * @param sourceName name of the {@link SourceNode} that will
be automatically added
+ * @param timestampExtractor the stateless timestamp extractor used for
this source,
+ * if not specified the default extractor
defined in the configs will be used
+ * @param keyDeserializer the {@link Deserializer} to deserialize
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize
values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already
registered
+ */
+ public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final
StoreBuilder<?> storeBuilder,
+ final String
sourceName,
+ final
TimestampExtractor timestampExtractor,
+ final
Deserializer<KIn> keyDeserializer,
+ final
Deserializer<VIn> valueDeserializer,
+ final String
topic,
+ final String
processorName,
+ final
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+ if (storeBuilder.loggingEnabled()) {
+ // -- disabling logging. We might want to print some logging.
+ storeBuilder.withLoggingDisabled();
Review Comment:
I think we should throw a `TopologyException` here (we do the same for
global stores if logging is enabled).
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java:
##########
@@ -59,7 +59,7 @@ public void testKStreamBranch() {
assertEquals(3, branches.length);
- final MockProcessorSupplier<Integer, String> supplier = new
MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String, Void, Void> supplier =
new MockProcessorSupplier<>();
Review Comment:
Seems this changes are not related to this PR (similar below)? Would be good
to exclude them and put into it's own PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]