This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32e97b1 MINOR: Remove deprecated parameter in
ProcessorContext#register (#4911)
32e97b1 is described below
commit 32e97b1d9db46cab526d1882eaf9633934ed21bd
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon May 7 09:22:26 2018 -0700
MINOR: Remove deprecated parameter in ProcessorContext#register (#4911)
Updated the upgrade doc as well since we do not have an overloaded function
without the deprecated parameter before. Also renamed the 1.2 release version
to 2.0.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
docs/streams/upgrade-guide.html | 73 +++++++++++-----------
docs/upgrade.html | 25 ++++----
.../examples/wordcount/WordCountProcessorTest.java | 2 +-
.../kstream/internals/KStreamTransformValues.java | 3 +-
.../kafka/streams/processor/ProcessorContext.java | 7 +--
.../internals/AbstractProcessorContext.java | 1 -
.../state/internals/InMemoryKeyValueStore.java | 2 +-
.../streams/state/internals/MemoryLRUCache.java | 2 +-
.../internals/RocksDBSegmentedBytesStore.java | 2 +-
.../streams/state/internals/RocksDBStore.java | 2 +-
.../internals/AbstractProcessorContextTest.java | 6 +-
.../internals/ProcessorStateManagerTest.java | 2 +-
.../streams/state/KeyValueStoreTestDriver.java | 2 +-
.../kafka/test/InternalMockProcessorContext.java | 1 -
.../java/org/apache/kafka/test/MockStateStore.java | 2 +-
.../apache/kafka/test/NoOpProcessorContext.java | 1 -
.../streams/processor/MockProcessorContext.java | 1 -
.../kafka/streams/MockProcessorContextTest.java | 2 +-
18 files changed, 65 insertions(+), 71 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 462824f..646908d 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -34,28 +34,28 @@
</div>
<p>
- If you want to upgrade from 1.1.x to 1.2.0 and you have customized
window store implementations on the <code>ReadOnlyWindowStore</code> interface
+ If you want to upgrade from 1.1.x to 2.0.0 and you have customized
window store implementations on the <code>ReadOnlyWindowStore</code> interface
you'd need to update your code to incorporate the newly added public
APIs; otherwise you don't need to make any code changes.
- See <a href="#streams_api_changes_120">below</a> for a complete list
of 1.2.0 API and semantic changes that allow you to advance your application
and/or simplify your code base.
+ See <a href="#streams_api_changes_200">below</a> for a complete list
of 2.0.0 API and semantic changes that allow you to advance your application
and/or simplify your code base.
</p>
<p>
- If you want to upgrade from 1.0.x to 1.2.0 and you have customized
window store implementations on the <code>ReadOnlyWindowStore</code> interface
+ If you want to upgrade from 1.0.x to 2.0.0 and you have customized
window store implementations on the <code>ReadOnlyWindowStore</code> interface
you'd need to update your code to incorporate the newly added public
APIs.
Otherwise, if you are using Java 7 you don't need to make any code
changes as the public API is fully backward compatible;
but if you are using Java 8 method references in your Kafka Streams
code you might need to update your code to resolve method ambiguities.
Hot-swaping the jar-file only might not work for this case.
- See below a complete list of <a
href="#streams_api_changes_120">1.2.0</a> and <a
href="#streams_api_changes_110">1.1.0</a>
+ See below a complete list of <a
href="#streams_api_changes_200">2.0.0</a> and <a
href="#streams_api_changes_110">1.1.0</a>
API and semantic changes that allow you to advance your application
and/or simplify your code base.
</p>
<p>
- If you want to upgrade from 0.10.2.x or 0.11.0.x to 1.2.x and you have
customized window store implementations on the <code>ReadOnlyWindowStore</code>
interface
+ If you want to upgrade from 0.10.2.x or 0.11.0.x to 2.0.x and you have
customized window store implementations on the <code>ReadOnlyWindowStore</code>
interface
you'd need to update your code to incorporate the newly added public
APIs.
Otherwise, if you are using Java 7 you don't need to do any code
changes as the public API is fully backward compatible;
but if you are using Java 8 method references in your Kafka Streams
code you might need to update your code to resolve method ambiguities.
However, some public APIs were deprecated and thus it is recommended
to update your code eventually to allow for future upgrades.
- See below a complete list of <a
href="#streams_api_changes_120">1.2</a>, <a
href="#streams_api_changes_110">1.1</a>,
+ See below a complete list of <a
href="#streams_api_changes_200">2.0</a>, <a
href="#streams_api_changes_110">1.1</a>,
<a href="#streams_api_changes_100">1.0</a>, and <a
href="#streams_api_changes_0110">0.11.0</a> API
and semantic changes that allow you to advance your application and/or
simplify your code base, including the usage of new features.
Additionally, Streams API 1.1.x requires broker on-disk message format
version 0.10 or higher; thus, you need to make sure that the message
@@ -63,45 +63,43 @@
</p>
<p>
- If you want to upgrade from 0.10.1.x to 1.2.x see the Upgrade Sections
for <a
href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>,
+ If you want to upgrade from 0.10.1.x to 2.0.x see the Upgrade Sections
for <a
href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>,
<a
href="/{{version}}/documentation/#upgrade_1100_streams"><b>0.11.0</b></a>,
<a
href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>,
- <a
href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>, and
- <a
href="/{{version}}/documentation/#upgrade_110_streams"><b>1.2</b></a>.
- Note, that a brokers on-disk message format must be on version 0.10 or
higher to run a Kafka Streams application version 1.2 or higher.
+ <a
href="/{{version}}/documentation/#upgrade_110_streams"><b>1.1</b></a>, and
+ <a
href="/{{version}}/documentation/#upgrade_200_streams"><b>2.0</b></a>.
+ Note, that a brokers on-disk message format must be on version 0.10 or
higher to run a Kafka Streams application version 2.0 or higher.
See below a complete list of <a
href="#streams_api_changes_0102">0.10.2</a>, <a
href="#streams_api_changes_0110">0.11.0</a>,
- <a href="#streams_api_changes_100">1.0</a>, <a
href="#streams_api_changes_110">1.1</a>, and <a
href="#streams_api_changes_120">1.2</a>
+ <a href="#streams_api_changes_100">1.0</a>, <a
href="#streams_api_changes_110">1.1</a>, and <a
href="#streams_api_changes_200">2.0</a>
API and semantical changes that allow you to advance your application
and/or simplify your code base, including the usage of new features.
</p>
<p>
- Upgrading from 0.10.0.x to 1.2.0 directly is also possible.
+ Upgrading from 0.10.0.x to 2.0.0 directly is also possible.
Note, that a brokers must be on version 0.10.1 or higher and on-disk
message format must be on version 0.10 or higher
- to run a Kafka Streams application version 1.2 or higher.
+ to run a Kafka Streams application version 2.0 or higher.
See <a href="#streams_api_changes_0101">Streams API changes in
0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in
0.10.2</a>,
<a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
<a href="#streams_api_changes_100">Streams API changes in 1.0</a>, and
- <a href="#streams_api_changes_110">Streams API changes in 1.1</a>, and
<a href="#streams_api_changes_120">Streams API changes in 1.2</a>
+ <a href="#streams_api_changes_110">Streams API changes in 1.1</a>, and
<a href="#streams_api_changes_200">Streams API changes in 2.0</a>
for a complete list of API changes.
- Upgrading to 1.2.0 requires two rolling bounces with config
<code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ Upgrading to 2.0.0 requires two rolling bounces with config
<code>upgrade.from="0.10.0"</code> set for first upgrade phase
(cf. <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
As an alternative, an offline upgrade is also possible.
</p>
<ul>
- <li> prepare your application instances for a rolling bounce and make
sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for
new version 1.2.0</li>
+ <li> prepare your application instances for a rolling bounce and make
sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for
new version 2.0.0</li>
<li> bounce each instance of your application once </li>
- <li> prepare your newly deployed 1.2.0 application instances for a
second round of rolling bounces; make sure to remove the value for config
<code>upgrade.mode</code> </li>
+ <li> prepare your newly deployed 2.0.0 application instances for a
second round of rolling bounces; make sure to remove the value for config
<code>upgrade.mode</code> </li>
<li> bounce each instance of your application once more to complete
the upgrade </li>
</ul>
- <p> Upgrading from 0.10.0.x to 1.2.0 in offline mode: </p>
+ <p> Upgrading from 0.10.0.x to 2.0.0 in offline mode: </p>
<ul>
<li> stop all old (0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and
new jar file </li>
- <li> restart all new (1.2.0) application instances </li>
+ <li> restart all new (2.0.0) application instances </li>
</ul>
- <!-- TODO: verify release verion and update `id` and `href` attributes
(also at other places that link to this headline) -->
-
- <h3><a id="streams_api_changes_120"
href="#streams_api_changes_120">Streams API changes in 1.2.0</a></h3>
+ <h3><a id="streams_api_changes_200"
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>
We have removed the <code>skippedDueToDeserializationError-rate</code>
and <code>skippedDueToDeserializationError-total</code> metrics.
Deserialization errors, and all other causes of record skipping, are
now accounted for in the pre-existing metrics
@@ -138,7 +136,6 @@
For users who have customized window store implementations on the
above interface, they'd need to update their code to implement the newly added
method as well.
For more details, see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores">KIP-261</a>.
</p>
-
<p>
We have added public <code>WindowedSerdes</code> to allow users to
read from / write to a topic storing windowed table changelogs directly.
In addition, in <code>StreamsConfig</code> we have also added
<code>default.windowed.key.serde.inner</code> and
<code>default.windowed.value.serde.inner</code>
@@ -151,22 +148,26 @@
For more details, see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
</p>
<p>
- Kafka 1.2.0 allows to manipulate timestamps of output records using the
Processor API (<a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
- To enable this new feature, <code>ProcessorContext#forward(...)</code>
was modified.
- The two existing overloads <code>#forward(Object key, Object value,
String childName)</code> and <code>#forward(Object key, Object value, int
childIndex)</code> were deprecated and a new overload <code>#forward(Object
key, Object value, To to)</code> was added.
- The new class <code>To</code> allows you to send records to all or
specific downstream processors by name and to set the timestamp for the output
record.
- Forwarding based on child index is not supported in the new API any
longer.
+ Kafka 2.0.0 allows to manipulate timestamps of output records using
the Processor API (<a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
+ To enable this new feature, <code>ProcessorContext#forward(...)</code>
was modified.
+ The two existing overloads <code>#forward(Object key, Object value,
String childName)</code> and <code>#forward(Object key, Object value, int
childIndex)</code> were deprecated and a new overload <code>#forward(Object
key, Object value, To to)</code> was added.
+ The new class <code>To</code> allows you to send records to all or
specific downstream processors by name and to set the timestamp for the output
record.
+ Forwarding based on child index is not supported in the new API any
longer.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for repartition topics by setting its default value
to <code>Long.MAX_VALUE</code>.
Instead of relying on data retention Kafka Streams uses the new purge
data API to delete consumed data from those topics and to keep used storage
small now.
</p>
<p>
- Kafka Streams DSL for Scala is a new Kafka Streams client library
available for developers authoring Kafka Streams applications in Scala. It
wraps core Kafka Streams DSL types to make it easier to call when
- interoperating with Scala code. For example, it includes higher order
functions as parameters for transformations avoiding the need anonymous classes
in Java 7 or experimental SAM type conversions in Scala 2.11, automatic
conversion between Java and Scala collection types, a way
- to implicitly provide SerDes to reduce boilerplate from your application
and make it more typesafe, and more! For more information see the
- <a
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka
Streams DSL for Scala documentation</a> and
- <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams">KIP-270</a>.
+ We have modified the <code>ProcessorStateManger#register(...)</code>
signature and removed the deprecated <code>loggingEnabled</code> boolean
parameter as it is specified in the <code>StoreBuilder</code>.
+ Users who used this function to register their state stores into the
processor topology need to simply update their code and remove this parameter
from the caller.
+ </p>
+ <p>
+ Kafka Streams DSL for Scala is a new Kafka Streams client library
available for developers authoring Kafka Streams applications in Scala. It
wraps core Kafka Streams DSL types to make it easier to call when
+ interoperating with Scala code. For example, it includes higher order
functions as parameters for transformations avoiding the need anonymous classes
in Java 7 or experimental SAM type conversions in Scala 2.11, automatic
conversion between Java and Scala collection types, a way
+ to implicitly provide SerDes to reduce boilerplate from your
application and make it more typesafe, and more! For more information see the
+ <a
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka
Streams DSL for Scala documentation</a> and
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams">KIP-270</a>.
</p>
<h3><a id="streams_api_changes_110"
href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
@@ -177,9 +178,9 @@
</p>
<p>
- There is a new artifact <code>kafka-streams-test-utils</code> providing
a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and
<code>OutputVerifier</code> class.
- You can include the new artifact as a regular dependency to your unit
tests and use the test driver to test your business logic of your Kafka Streams
application.
- For more details, see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
+ There is a new artifact <code>kafka-streams-test-utils</code>
providing a <code>TopologyTestDriver</code>,
<code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
+ You can include the new artifact as a regular dependency to your
unit tests and use the test driver to test your business logic of your Kafka
Streams application.
+ For more details, see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
</p>
<p>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 8bfc61e..451f103 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -20,9 +20,9 @@
<script id="upgrade-template" type="text/x-handlebars-template">
-<h4><a id="upgrade_1_2_0" href="#upgrade_1_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 1.2.x</a></h4>
-<p>Kafka 1.2.0 introduces wire protocol changes. By following the recommended
rolling upgrade plan below,
- you guarantee no downtime during the upgrade. However, please review the
<a href="#upgrade_120_notable">notable changes in 1.2.0</a> before upgrading.
+<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.x</a></h4>
+<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended
rolling upgrade plan below,
+ you guarantee no downtime during the upgrade. However, please review the
<a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading.
</p>
<p><b>For a rolling upgrade:</b></p>
@@ -48,7 +48,7 @@
<li> Restart the brokers one by one for the new protocol version to take
effect.</li>
<li> If you have overridden the message format version as instructed
above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have
been upgraded to 0.11.0 or later,
- change log.message.format.version to 1.2 on each broker and restart
them one by one. Note that the older Scala consumer
+ change log.message.format.version to 2.0 on each broker and restart
them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to
avoid the performance cost of down-conversion (or to
take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly
once semantics</a>), the newer Java consumer must be used.</li>
</ol>
@@ -64,7 +64,7 @@
Hot-swapping the jar-file only might not work.</li>
</ol>
-<h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable changes in
1.2.0</a></h5>
+<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in
2.0.0</a></h5>
<ul>
<li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a>
increases the default offset retention time from 1 day to 7 days. This makes it
less likely to "lose" offsets in an application that commits infrequently. It
also increases the active set of offsets and therefore can increase memory
usage on the broker. Note that the console consumer currently enables offset
commit by default and can be the source of a large number of offsets which this
change will now preserve for [...]
<li><a
href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends
the lower interval of <code>max.connections.per.ip minimum</code> to zero and
therefore allows IP-based filtering of inbound connections.</li>
@@ -74,18 +74,19 @@
JMX monitoring tools that do not automatically aggregate. To get the
total count for a specific request type, the tool needs to be
updated to aggregate across different versions.
</li>
- <li> New Kafka Streams configuration parameter <code>upgrade.from</code>
added that allows rolling bounce upgrade from older version. </li>
- <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for repartition topics by setting its default value
to <code>Long.MAX_VALUE</code>.</li>
+ <li>New Kafka Streams configuration parameter <code>upgrade.from</code>
added that allows rolling bounce upgrade from older version. </li>
+ <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for Kafka Streams repartition topics by setting its
default value to <code>Long.MAX_VALUE</code>.</li>
+ <li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for
registering state stores to the processor topology. For more details please
read the Streams <a
href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade
Guide</a>.</li>
</ul>
-<h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New
Protocol Versions</a></h5>
+<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New
Protocol Versions</a></h5>
<ul></ul>
-<h5><a id="upgrade_120_streams" href="#upgrade_120_streams">Upgrading a 1.2.0
Kafka Streams Application</a></h5>
+<h5><a id="upgrade_200_streams" href="#upgrade_200_streams">Upgrading a 2.0.0
Kafka Streams Application</a></h5>
<ul>
- <li> Upgrading your Streams application from 1.1.0 to 1.2.0 does not
require a broker upgrade.
- A Kafka Streams 1.2.0 application can connect to 1.2, 1.1, 1.0,
0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0
brokers though). </li>
- <li> See <a
href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams
API changes in 1.2.0</a> for more details. </li>
+ <li> Upgrading your Streams application from 1.1.0 to 2.0.0 does not
require a broker upgrade.
+ A Kafka Streams 2.0.0 application can connect to 2.0, 1.1, 1.0,
0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0
brokers though). </li>
+ <li> See <a
href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Streams
API changes in 2.0.0</a> for more details. </li>
</ul>
<h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4>
diff --git
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index 566b7d4..faced6d 100644
---
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -44,7 +44,7 @@ public class WordCountProcessorTest {
.withLoggingDisabled() // Changelog is not supported by
MockProcessorContext.
.build();
store.init(context, store);
- context.register(store, false, null);
+ context.register(store, null);
// Create and initialize the processor under test
final Processor<String, String> processor = new
WordCountProcessorDemo.MyProcessorSupplier().get();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index d09fae2..fb6af34 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -91,9 +91,8 @@ public class KStreamTransformValues<K, V, R> implements
ProcessorSupplier<K, V>
@Override
public void register(final StateStore store,
- final boolean
deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback
stateRestoreCallback) {
- context.register(store,
deprecatedAndIgnoredLoggingEnabled, stateRestoreCallback);
+ context.register(store, stateRestoreCallback);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 93a1455..79d191c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -76,15 +76,12 @@ public interface ProcessorContext {
* Registers and possibly restores the specified storage engine.
*
* @param store the storage engine
- * @param loggingEnabledIsDeprecatedAndIgnored deprecated parameter {@code
loggingEnabled} is ignored:
- * if you want to enable
logging on a state stores call
- * {@link
org.apache.kafka.streams.state.StoreBuilder#withLoggingEnabled(Map)}
- * when creating the store
+ * @param stateRestoreCallback the restoration callback logic for
log-backed state stores upon restart
+ *
* @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not contain the
partition
*/
void register(final StateStore store,
- final boolean loggingEnabledIsDeprecatedAndIgnored,
final StateRestoreCallback stateRestoreCallback);
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index fc3067e..9687477 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -92,7 +92,6 @@ public abstract class AbstractProcessorContext implements
InternalProcessorConte
@Override
public void register(final StateStore store,
- final boolean deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
if (initialized) {
throw new IllegalStateException("Can only create state stores
during initialization.");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 2cdfc4b..9ea75a9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -72,7 +72,7 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
if (root != null) {
// register the store
- context.register(root, false, new StateRestoreCallback() {
+ context.register(root, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
// this is a delete
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 2785540..b99c907 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -111,7 +111,7 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
// register the store
- context.register(root, false, new StateRestoreCallback() {
+ context.register(root, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
restoring = true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 865703c..ec9e6f7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -126,7 +126,7 @@ class RocksDBSegmentedBytesStore implements
SegmentedBytesStore {
segments.openExisting(context);
// register and possibly restore the state from the logs
- context.register(root, false, new StateRestoreCallback() {
+ context.register(root, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
put(Bytes.wrap(key), value);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f54c783..2813041 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -169,7 +169,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
- context.register(root, false, this.batchingStateRestoreCallback);
+ context.register(root, this.batchingStateRestoreCallback);
}
private RocksDB openDB(final File dir,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 43dc38e..86806b2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -55,7 +55,7 @@ public class AbstractProcessorContextTest {
public void
shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() {
context.initialized();
try {
- context.register(stateStore, false, null);
+ context.register(stateStore, null);
fail("should throw illegal state exception when context already
initialized");
} catch (IllegalStateException e) {
// pass
@@ -64,12 +64,12 @@ public class AbstractProcessorContextTest {
@Test
public void
shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized() {
- context.register(stateStore, false, null);
+ context.register(stateStore, null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
- context.register(null, false, null);
+ context.register(null, null);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 31f07cc..6a20cd9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -687,7 +687,7 @@ public class ProcessorStateManagerTest {
stateManager.reinitializeStateStoresForPartitions(changelogPartitions,
new NoOpProcessorContext() {
@Override
- public void register(final StateStore store, final boolean
deprecatedAndIgnoredLoggingEnabled, final StateRestoreCallback
stateRestoreCallback) {
+ public void register(final StateStore store, final
StateRestoreCallback stateRestoreCallback) {
stateManager.register(store, stateRestoreCallback);
}
});
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 33591c6..4e80fa7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -98,7 +98,7 @@ import java.util.Set;
*
* <h2>Restoring a store</h2>
* This component can be used to test whether a {@link KeyValueStore}
implementation properly
- * {@link ProcessorContext#register(StateStore, boolean, StateRestoreCallback)
registers itself} with the {@link ProcessorContext}, so that
+ * {@link ProcessorContext#register(StateStore, StateRestoreCallback)
registers itself} with the {@link ProcessorContext}, so that
* the persisted contents of a store are properly restored from the flushed
entries when the store instance is started.
* <p>
* To do this, create an instance of this driver component, {@link
#addEntryToRestoreLog(Object, Object) add entries} that will be
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 27a0094..eb72e13 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -181,7 +181,6 @@ public class InternalMockProcessorContext extends
AbstractProcessorContext imple
@Override
public void register(final StateStore store,
- final boolean deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback func) {
storeMap.put(store.name(), store);
restoreFuncs.put(store.name(), func);
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
b/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
index f218f04..a2b0d21 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
@@ -47,7 +47,7 @@ public class MockStateStore implements StateStore {
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- context.register(root, false, stateRestoreCallback);
+ context.register(root, stateRestoreCallback);
initialized = true;
closed = false;
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index e931c7e..92f84c5 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -87,7 +87,6 @@ public class NoOpProcessorContext extends
AbstractProcessorContext {
@Override
public void register(final StateStore store,
- final boolean deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
// no-op
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index c387c36..3e29cde 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -337,7 +337,6 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
@Override
public void register(final StateStore store,
- final boolean loggingEnabledIsDeprecatedAndIgnored,
final StateRestoreCallback
stateRestoreCallbackIsIgnoredInMock) {
stateStores.put(store.name(), store);
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 934e043..dbb26e0 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -240,7 +240,7 @@ public class MockProcessorContextTest {
final MockProcessorContext context = new MockProcessorContext();
final KeyValueStore<String, Long> store = new
InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long());
- context.register(store, false, null);
+ context.register(store, null);
store.init(context, store);
processor.init(context);
--
To stop receiving notification emails like this one, please contact
[email protected].