This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa1702f MINOR: Remove deprecated valueTransformer.punctuate (#4993)
fa1702f is described below
commit fa1702fece04c5fc50149fc9b05d77a459b7180b
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu May 10 09:50:59 2018 -0700
MINOR: Remove deprecated valueTransformer.punctuate (#4993)
Also removed the InternalValueTransformerWithKey / Supplier which is used
to mock away the deprecated punctuate function.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/kstream/ValueTransformer.java | 22 -----------
.../streams/kstream/internals/AbstractStream.java | 46 ++--------------------
.../internals/InternalValueTransformerWithKey.java | 24 -----------
.../InternalValueTransformerWithKeySupplier.java | 21 ----------
.../streams/kstream/internals/KStreamImpl.java | 10 ++---
.../kstream/internals/KStreamTransformValues.java | 10 +++--
.../kstream/internals/AbstractStreamTest.java | 35 ++++++++--------
.../internals/KStreamTransformValuesTest.java | 16 ++------
8 files changed, 34 insertions(+), 150 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1da779e..c0da38a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.To;
/**
@@ -85,27 +84,6 @@ public interface ValueTransformer<V, VR> {
VR transform(final V value);
/**
- * Perform any periodic operations if this processor {@link
ProcessorContext#schedule(long) schedule itself} with
- * the context during {@link #init(ProcessorContext) initialization}.
- * <p>
- * It is not possible to return any new output records within {@code
punctuate}.
- * Using {@link ProcessorContext#forward(Object, Object)} or {@link
ProcessorContext#forward(Object, Object, To)}
- * will result in an {@link StreamsException exception}.
- * Furthermore, {@code punctuate} must return {@code null}.
- * <p>
- * Note, that {@code punctuate} is called base on <it>stream time</it>
(i.e., time progress with regard to
- * timestamps return by the used {@link TimestampExtractor})
- * and not based on wall-clock time.
- *
- * @deprecated Please use {@link Punctuator} functional interface instead.
- *
- * @param timestamp the stream time when {@code punctuate} is being called
- * @return must return {@code null}—otherwise, an {@link
StreamsException exception} will be thrown
- */
- @Deprecated
- VR punctuate(final long timestamp);
-
- /**
* Close this processor and clean up any resources.
* <p>
* It is not possible to return any new output records within {@code
close()}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 497bdac..7bc7a15 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
@@ -84,19 +83,13 @@ public abstract class AbstractStream<K> {
};
}
- static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR>
toInternalValueTransformerSupplier(final ValueTransformerSupplier<V, VR>
valueTransformerSupplier) {
+ static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR>
toValueTransformerWithKeySupplier(final ValueTransformerSupplier<V, VR>
valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier,
"valueTransformerSupplier can't be null");
- return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
+ return new ValueTransformerWithKeySupplier<K, V, VR>() {
@Override
- public InternalValueTransformerWithKey<K, V, VR> get() {
+ public ValueTransformerWithKey<K, V, VR> get() {
final ValueTransformer<V, VR> valueTransformer =
valueTransformerSupplier.get();
- return new InternalValueTransformerWithKey<K, V, VR>() {
- @SuppressWarnings("deprecation")
- @Override
- public VR punctuate(final long timestamp) {
- return valueTransformer.punctuate(timestamp);
- }
-
+ return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
@@ -115,35 +108,4 @@ public abstract class AbstractStream<K> {
}
};
}
-
- static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR>
toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier<K, V,
VR> valueTransformerWithKeySupplier) {
- Objects.requireNonNull(valueTransformerWithKeySupplier,
"valueTransformerSupplier can't be null");
- return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
- @Override
- public InternalValueTransformerWithKey<K, V, VR> get() {
- final ValueTransformerWithKey<K, V, VR>
valueTransformerWithKey = valueTransformerWithKeySupplier.get();
- return new InternalValueTransformerWithKey<K, V, VR>() {
- @Override
- public VR punctuate(final long timestamp) {
- throw new
StreamsException("ValueTransformerWithKey#punctuate should not be called.");
- }
-
- @Override
- public void init(final ProcessorContext context) {
- valueTransformerWithKey.init(context);
- }
-
- @Override
- public VR transform(final K readOnlyKey, final V value) {
- return valueTransformerWithKey.transform(readOnlyKey,
value);
- }
-
- @Override
- public void close() {
- valueTransformerWithKey.close();
- }
- };
- }
- };
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
deleted file mode 100644
index 636e409..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-
-public interface InternalValueTransformerWithKey<K, V, VR> extends
ValueTransformerWithKey<K, V, VR> {
- VR punctuate(final long timestamp);
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
deleted file mode 100644
index 3418e71..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-public interface InternalValueTransformerWithKeySupplier<K, V, VR> {
- InternalValueTransformerWithKey<K, V, VR> get();
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2ddd5ff..b8195a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -348,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
final String...
stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier,
"valueTransformSupplier can't be null");
- return
transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier),
stateStoreNames);
+ return
doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier),
stateStoreNames);
}
@Override
@@ -356,13 +356,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
final String...
stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier,
"valueTransformSupplier can't be null");
- return
transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier),
stateStoreNames);
+ return doTransformValues(valueTransformerSupplier, stateStoreNames);
}
- private <VR> KStream<K, VR> transformValues(final
InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
internalValueTransformerWithKeySupplier,
- final String...
stateStoreNames) {
+ private <VR> KStream<K, VR> doTransformValues(final
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
valueTransformerWithKeySupplier,
+ final String...
stateStoreNames) {
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
- builder.internalTopologyBuilder.addProcessor(name, new
KStreamTransformValues<>(internalValueTransformerWithKeySupplier), this.name);
+ builder.internalTopologyBuilder.addProcessor(name, new
KStreamTransformValues<>(valueTransformerWithKeySupplier), this.name);
if (stateStoreNames != null && stateStoreNames.length > 0) {
builder.internalTopologyBuilder.connectProcessorAndStateStores(name,
stateStoreNames);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index fb6af34..d45b7cf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -35,9 +37,9 @@ import java.util.Map;
public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V> {
- private final InternalValueTransformerWithKeySupplier<K, V, R>
valueTransformerSupplier;
+ private final ValueTransformerWithKeySupplier<K, V, R>
valueTransformerSupplier;
- public KStreamTransformValues(final
InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) {
+ KStreamTransformValues(final ValueTransformerWithKeySupplier<K, V, R>
valueTransformerSupplier) {
this.valueTransformerSupplier = valueTransformerSupplier;
}
@@ -48,10 +50,10 @@ public class KStreamTransformValues<K, V, R> implements
ProcessorSupplier<K, V>
public static class KStreamTransformValuesProcessor<K, V, R> implements
Processor<K, V> {
- private final InternalValueTransformerWithKey<K, V, R>
valueTransformer;
+ private final ValueTransformerWithKey<K, V, R> valueTransformer;
private ProcessorContext context;
- public KStreamTransformValuesProcessor(final
InternalValueTransformerWithKey<K, V, R> valueTransformer) {
+ KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R>
valueTransformer) {
this.valueTransformer = valueTransformer;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 1f9bcba..a37b6f8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -47,29 +47,26 @@ public class AbstractStreamTest {
@Test
public void testToInternlValueTransformerSupplierSuppliesNewTransformers()
{
- final ValueTransformerSupplier vts =
createMock(ValueTransformerSupplier.class);
- expect(vts.get()).andReturn(null).times(3);
- final InternalValueTransformerWithKeySupplier ivtwks =
- AbstractStream.toInternalValueTransformerSupplier(vts);
- replay(vts);
- ivtwks.get();
- ivtwks.get();
- ivtwks.get();
- verify(vts);
+ final ValueTransformerSupplier valueTransformerSupplier =
createMock(ValueTransformerSupplier.class);
+ expect(valueTransformerSupplier.get()).andReturn(null).times(3);
+ final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier =
+
AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier);
+ replay(valueTransformerSupplier);
+ valueTransformerWithKeySupplier.get();
+ valueTransformerWithKeySupplier.get();
+ valueTransformerWithKeySupplier.get();
+ verify(valueTransformerSupplier);
}
@Test
public void
testToInternalValueTransformerSupplierSuppliesNewTransformers() {
- final ValueTransformerWithKeySupplier vtwks =
- createMock(ValueTransformerWithKeySupplier.class);
- expect(vtwks.get()).andReturn(null).times(3);
- final InternalValueTransformerWithKeySupplier ivtwks =
- AbstractStream.toInternalValueTransformerSupplier(vtwks);
- replay(vtwks);
- ivtwks.get();
- ivtwks.get();
- ivtwks.get();
- verify(vtwks);
+ final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier
= createMock(ValueTransformerWithKeySupplier.class);
+ expect(valueTransformerWithKeySupplier.get()).andReturn(null).times(3);
+ replay(valueTransformerWithKeySupplier);
+ valueTransformerWithKeySupplier.get();
+ valueTransformerWithKeySupplier.get();
+ valueTransformerWithKeySupplier.get();
+ verify(valueTransformerWithKeySupplier);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 419e6f1..807fb1f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -69,11 +69,6 @@ public class KStreamTransformValuesTest {
}
@Override
- public Integer punctuate(long timestamp) {
- return null;
- }
-
- @Override
public void close() {
}
};
@@ -143,15 +138,10 @@ public class KStreamTransformValuesTest {
@Test
public void
shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
final BadValueTransformer badValueTransformer = new
BadValueTransformer();
- final KStreamTransformValues<Integer, Integer, Integer> transformValue
= new KStreamTransformValues<>(new
InternalValueTransformerWithKeySupplier<Integer, Integer, Integer>() {
+ final KStreamTransformValues<Integer, Integer, Integer> transformValue
= new KStreamTransformValues<>(new ValueTransformerWithKeySupplier<Integer,
Integer, Integer>() {
@Override
- public InternalValueTransformerWithKey<Integer, Integer, Integer>
get() {
- return new InternalValueTransformerWithKey<Integer, Integer,
Integer>() {
- @Override
- public Integer punctuate(long timestamp) {
- throw new
StreamsException("ValueTransformerWithKey#punctuate should not be called.");
- }
-
+ public ValueTransformerWithKey<Integer, Integer, Integer> get() {
+ return new ValueTransformerWithKey<Integer, Integer,
Integer>() {
@Override
public void init(final ProcessorContext context) {
badValueTransformer.init(context);
--
To stop receiving notification emails like this one, please contact
[email protected].