This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new acd3858 KAFKA-7396: Materialized, Serialized, Joined, Consumed and
Produced with implicit Serdes (#5551)
acd3858 is described below
commit acd3858ea69e676a7840d998240deb32aee62dc0
Author: Joan Goyeau <[email protected]>
AuthorDate: Tue Sep 11 22:08:42 2018 +0100
KAFKA-7396: Materialized, Serialized, Joined, Consumed and Produced with
implicit Serdes (#5551)
We want to make sure that we always have a serde for all Materialized,
Serialized, Joined, Consumed and Produced.
For that we can make use of the implicit parameters in Scala.
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde
Reviewers: John Roesler <[email protected]>, Matthias J.
Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang
<[email protected]>, Ted Yu <[email protected]>
---
.../org/apache/kafka/streams/kstream/Produced.java | 3 +-
.../kstream/internals/ProducedInternal.java | 2 +-
.../kstream/internals/SerializedInternal.java | 9 +-
.../kafka/streams/scala/ImplicitConversions.scala | 15 ++-
.../kafka/streams/scala/StreamsBuilder.scala | 2 +-
.../kafka/streams/scala/kstream/Consumed.scala | 79 +++++++++++++++
.../kafka/streams/scala/kstream/Joined.scala | 42 ++++++++
.../streams/scala/kstream/KGroupedStream.scala | 4 +-
.../streams/scala/kstream/KGroupedTable.scala | 4 +-
.../kafka/streams/scala/kstream/Materialized.scala | 107 +++++++++++++++++++++
.../kafka/streams/scala/kstream/Produced.scala | 59 ++++++++++++
.../kafka/streams/scala/kstream/Serialized.scala | 36 +++++++
.../scala/kstream/TimeWindowedKStream.scala | 4 +-
.../kafka/streams/scala/kstream/package.scala} | 24 ++---
.../apache/kafka/streams/scala/TopologyTest.scala | 21 +++-
.../apache/kafka/streams/scala/WordCountTest.scala | 1 -
.../kafka/streams/scala/kstream/ConsumedTest.scala | 73 ++++++++++++++
.../kafka/streams/scala/kstream/JoinedTest.scala} | 34 +++----
.../streams/scala/{ => kstream}/KStreamTest.scala | 5 +-
.../streams/scala/{ => kstream}/KTableTest.scala | 9 +-
.../streams/scala/kstream/MaterializedTest.scala | 84 ++++++++++++++++
.../kafka/streams/scala/kstream/ProducedTest.scala | 51 ++++++++++
.../streams/scala/kstream/SerializedTest.scala} | 35 ++++---
23 files changed, 615 insertions(+), 88 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index 8d2742a..a3d96bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -71,7 +71,8 @@ public class Produced<K, V> {
* @param valueSerde Serde to use for serializing the value
* @param partitioner the function used to determine how records are
distributed among partitions of the topic,
* if not specified and {@code keySerde} provides a
{@link WindowedSerializer} for the key
- * {@link WindowedStreamPartitioner} will be
used—otherwise {@link DefaultPartitioner} wil be used
+ * {@link WindowedStreamPartitioner} will be
used—otherwise {@link DefaultPartitioner}
+ * will be used
* @param <K> key type
* @param <V> value type
* @return A new {@link Produced} instance configured with keySerde,
valueSerde, and partitioner
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
index 3197244..358982b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class ProducedInternal<K, V> extends Produced<K, V> {
- ProducedInternal(final Produced<K, V> produced) {
+ public ProducedInternal(final Produced<K, V> produced) {
super(produced);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
index fb802ea..c6df11f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
@@ -19,17 +19,16 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Serialized;
-class SerializedInternal<K, V> extends Serialized<K, V> {
- SerializedInternal(final Serialized<K, V> serialized) {
+public class SerializedInternal<K, V> extends Serialized<K, V> {
+ public SerializedInternal(final Serialized<K, V> serialized) {
super(serialized);
}
- Serde<K> keySerde() {
+ public Serde<K> keySerde() {
return keySerde;
}
- Serde<V> valueSerde() {
+ public Serde<V> valueSerde() {
return valueSerde;
}
-
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index d1ff674..c2ac1ff 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -25,14 +25,13 @@ import org.apache.kafka.streams.kstream.{
KStream => KStreamJ,
KTable => KTableJ,
SessionWindowedKStream => SessionWindowedKStreamJ,
- TimeWindowedKStream => TimeWindowedKStreamJ,
- _
+ TimeWindowedKStream => TimeWindowedKStreamJ
}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.common.serialization.Serde
-import scala.language.implicitConversions
+import scala.language.implicitConversions
import org.apache.kafka.streams.processor.StateStore
/**
@@ -65,20 +64,20 @@ object ImplicitConversions {
// and these implicits will convert them to `Serialized`, `Produced` or
`Consumed`
implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K],
valueSerde: Serde[V]): Serialized[K, V] =
- Serialized.`with`(keySerde, valueSerde)
+ Serialized.`with`[K, V]
implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K],
valueSerde: Serde[V]): Consumed[K, V] =
- Consumed.`with`(keySerde, valueSerde)
+ Consumed.`with`[K, V]
implicit def producedFromSerde[K, V](implicit keySerde: Serde[K],
valueSerde: Serde[V]): Produced[K, V] =
- Produced.`with`(keySerde, valueSerde)
+ Produced.`with`[K, V]
implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde:
Serde[K],
valueSerde:
Serde[V]): Materialized[K, V, S] =
- Materialized.`with`[K, V, S](keySerde, valueSerde)
+ Materialized.`with`[K, V, S]
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde:
Serde[K],
valueSerde: Serde[V],
otherValueSerde:
Serde[VO]): Joined[K, V, VO] =
- Joined.`with`(keySerde, valueSerde, otherValueSerde)
+ Joined.`with`[K, V, VO]
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index fcec778..8c5a9b3 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -21,7 +21,7 @@ package org.apache.kafka.streams.scala
import java.util.regex.Pattern
-import org.apache.kafka.streams.kstream.{Consumed, GlobalKTable, Materialized}
+import org.apache.kafka.streams.kstream.GlobalKTable
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
import org.apache.kafka.streams.state.StoreBuilder
import org.apache.kafka.streams.{Topology, StreamsBuilder => StreamsBuilderJ}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
new file mode 100644
index 0000000..a105ed6
--- /dev/null
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.processor.TimestampExtractor
+
+object Consumed {
+
+ /**
+ * Create an instance of [[Consumed]] with the supplied arguments. `null`
values are acceptable.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @param timestampExtractor the timestamp extractor to used. If `null` the
default timestamp extractor from
+ * config will be used
+ * @param resetPolicy the offset reset policy to be used. If `null`
the default reset policy from config
+ * will be used
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @return a new instance of [[Consumed]]
+ */
+ def `with`[K, V](
+ timestampExtractor: TimestampExtractor,
+ resetPolicy: Topology.AutoOffsetReset
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+ ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
+
+ /**
+ * Create an instance of [[Consumed]] with key and value [[Serde]]s.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @return a new instance of [[Consumed]]
+ */
+ def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]):
ConsumedJ[K, V] =
+ ConsumedJ.`with`(keySerde, valueSerde)
+
+ /**
+ * Create an instance of [[Consumed]] with a [[TimestampExtractor]].
+ *
+ * @param timestampExtractor the timestamp extractor to used. If `null` the
default timestamp extractor from
+ * config will be used
+ * @tparam K key type
+ * @tparam V value type
+ * @return a new instance of [[Consumed]]
+ */
+ def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde:
Serde[K],
+ valueSerde:
Serde[V]): ConsumedJ[K, V] =
+
ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+ /**
+ * Create an instance of [[Consumed]] with a [[Topology.AutoOffsetReset]].
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @param resetPolicy the offset reset policy to be used. If `null` the
default reset policy from config will be used
+ * @return a new instance of [[Consumed]]
+ */
+ def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde:
Serde[K],
+ valueSerde:
Serde[V]): ConsumedJ[K, V] =
+
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
+}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
new file mode 100644
index 0000000..ffd3e61
--- /dev/null
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Joined => JoinedJ}
+
+object Joined {
+
+ /**
+ * Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with
key, value, and otherValue [[Serde]]
+ * instances.
+ * `null` values are accepted and will be replaced by the default serdes as
defined in config.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @tparam VO other value type
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @param otherValueSerde the otherValue serde to use. If `null` the default
value serde from config will be used
+ * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the
provided serdes
+ */
+ def `with`[K, V, VO](implicit keySerde: Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
+ JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
+
+}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index f6a22d9..5d0f05e 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
/**
* Wraps the Java class KGroupedStream and delegates method calls to the
underlying Java object.
*
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
* @param inner The underlying Java abstraction for KGroupedStream
*
* @see `org.apache.kafka.streams.kstream.KGroupedStream`
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 76ea9ed..56f84e3 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
/**
* Wraps the Java class KGroupedTable and delegates method calls to the
underlying Java object.
*
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
* @param inner The underlying Java abstraction for KGroupedTable
*
* @see `org.apache.kafka.streams.kstream.KGroupedTable`
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
new file mode 100644
index 0000000..eb126f0
--- /dev/null
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Materialized => MaterializedJ}
+import org.apache.kafka.streams.processor.StateStore
+import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore,
ByteArraySessionStore, ByteArrayWindowStore}
+import org.apache.kafka.streams.state.{KeyValueBytesStoreSupplier,
SessionBytesStoreSupplier, WindowBytesStoreSupplier}
+
+object Materialized {
+
+ /**
+ * Materialize a [[StateStore]] with the provided key and value [[Serde]]s.
+ * An internal name will be used for the store.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @tparam S store type
+ * @param keySerde the key [[Serde]] to use.
+ * @param valueSerde the value [[Serde]] to use.
+ * @return a new [[Materialized]] instance with the given key and value
serdes
+ */
+ def `with`[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde:
Serde[V]): MaterializedJ[K, V, S] =
+ MaterializedJ.`with`(keySerde, valueSerde)
+
+ /**
+ * Materialize a [[StateStore]] with the given name.
+ *
+ * @tparam K key type of the store
+ * @tparam V value type of the store
+ * @tparam S type of the [[StateStore]]
+ * @param storeName the name of the underlying
[[org.apache.kafka.streams.scala.kstream.KTable]] state store;
+ * valid characters are ASCII alphanumerics, '.', '_' and
'-'.
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @return a new [[Materialized]] instance with the given storeName
+ */
+ def as[K, V, S <: StateStore](storeName: String)(implicit keySerde: Serde[K],
+ valueSerde: Serde[V]):
MaterializedJ[K, V, S] =
+
MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+ /**
+ * Materialize a [[org.apache.kafka.streams.state.WindowStore]] using the
provided [[WindowBytesStoreSupplier]].
+ *
+ * Important: Custom subclasses are allowed here, but they should respect
the retention contract:
+ * Window stores are required to retain windows at least as long as (window
size + window grace period).
+ * Stores constructed via [[org.apache.kafka.streams.state.Stores]] already
satisfy this contract.
+ *
+ * @tparam K key type of the store
+ * @tparam V value type of the store
+ * @param supplier the [[WindowBytesStoreSupplier]] used to materialize
the store
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @return a new [[Materialized]] instance with the given supplier
+ */
+ def as[K, V](supplier: WindowBytesStoreSupplier)(implicit keySerde: Serde[K],
+ valueSerde: Serde[V]):
MaterializedJ[K, V, ByteArrayWindowStore] =
+
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+ /**
+ * Materialize a [[org.apache.kafka.streams.state.SessionStore]] using the
provided [[SessionBytesStoreSupplier]].
+ *
+ * Important: Custom subclasses are allowed here, but they should respect
the retention contract:
+ * Session stores are required to retain windows at least as long as
(session inactivity gap + session grace period).
+ * Stores constructed via [[org.apache.kafka.streams.state.Stores]] already
satisfy this contract.
+ *
+ * @tparam K key type of the store
+ * @tparam V value type of the store
+ * @param supplier the [[SessionBytesStoreSupplier]] used to materialize
the store
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @return a new [[Materialized]] instance with the given supplier
+ */
+ def as[K, V](supplier: SessionBytesStoreSupplier)(implicit keySerde:
Serde[K],
+ valueSerde: Serde[V]):
MaterializedJ[K, V, ByteArraySessionStore] =
+
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+ /**
+ * Materialize a [[org.apache.kafka.streams.state.KeyValueStore]] using the
provided [[KeyValueBytesStoreSupplier]].
+ *
+ * @tparam K key type of the store
+ * @tparam V value type of the store
+ * @param supplier the [[KeyValueBytesStoreSupplier]] used to materialize
the store
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @return a new [[Materialized]] instance with the given supplier
+ */
+ def as[K, V](
+ supplier: KeyValueBytesStoreSupplier
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V,
ByteArrayKeyValueStore] =
+
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
+}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
new file mode 100644
index 0000000..351e0a5
--- /dev/null
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Produced => ProducedJ}
+import org.apache.kafka.streams.processor.StreamPartitioner
+
+object Produced {
+
+ /**
+ * Create a Produced instance with provided keySerde and valueSerde.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @param keySerde Serde to use for serializing the key
+ * @param valueSerde Serde to use for serializing the value
+ * @return A new [[Produced]] instance configured with keySerde and
valueSerde
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]):
ProducedJ[K, V] =
+ ProducedJ.`with`(keySerde, valueSerde)
+
+ /**
+ * Create a Produced instance with provided keySerde, valueSerde, and
partitioner.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @param partitioner the function used to determine how records are
distributed among partitions of the topic,
+ * if not specified and `keySerde` provides a
+ *
[[org.apache.kafka.streams.kstream.internals.WindowedSerializer]] for the key
+ *
[[org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner]] will be
+ * used—otherwise
[[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
+ * will be used
+ * @param keySerde Serde to use for serializing the key
+ * @param valueSerde Serde to use for serializing the value
+ * @return A new [[Produced]] instance configured with keySerde, valueSerde,
and partitioner
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde:
Serde[K],
+ valueSerde:
Serde[V]): ProducedJ[K, V] =
+ ProducedJ.`with`(keySerde, valueSerde, partitioner)
+}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
new file mode 100644
index 0000000..f48d9bf
--- /dev/null
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Serialized => SerializedJ}
+
+object Serialized {
+
+ /**
+ * Construct a `Serialized` instance with the provided key and value
[[Serde]]s.
+ * If the [[Serde]] params are `null` the default serdes defined in the
configs will be used.
+ *
+ * @tparam K the key type
+ * @tparam V the value type
+ * @param keySerde keySerde that will be used to materialize a stream
+ * @param valueSerde valueSerde that will be used to materialize a stream
+ * @return a new instance of [[Serialized]] configured with the provided
serdes
+ */
+ def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]):
SerializedJ[K, V] =
+ SerializedJ.`with`(keySerde, valueSerde)
+}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 9be5794..c54ba4f 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
/**
* Wraps the Java class TimeWindowedKStream and delegates method calls to the
underlying Java object.
*
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
* @param inner The underlying Java abstraction for TimeWindowedKStream
*
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream`
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
similarity index 61%
copy from
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
copy to
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
index fb802ea..842dd79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
@@ -14,22 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.scala
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Serialized;
-
-class SerializedInternal<K, V> extends Serialized<K, V> {
- SerializedInternal(final Serialized<K, V> serialized) {
- super(serialized);
- }
-
- Serde<K> keySerde() {
- return keySerde;
- }
-
- Serde<V> valueSerde() {
- return valueSerde;
- }
+import org.apache.kafka.streams.processor.StateStore
+package object kstream {
+ type Materialized[K, V, S <: StateStore] =
org.apache.kafka.streams.kstream.Materialized[K, V, S]
+ type Serialized[K, V] = org.apache.kafka.streams.kstream.Serialized[K, V]
+ type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V]
+ type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V]
+ type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO]
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index b596dd3..889e67c 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -21,9 +21,20 @@ package org.apache.kafka.streams.scala
import java.util.regex.Pattern
-import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ,
KStream => KStreamJ, KTable => KTableJ, _}
+import org.apache.kafka.streams.kstream.{
+ KeyValueMapper,
+ Reducer,
+ Transformer,
+ TransformerSupplier,
+ ValueJoiner,
+ ValueMapper,
+ KGroupedStream => KGroupedStreamJ,
+ KStream => KStreamJ,
+ KTable => KTableJ
+}
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
import org.junit.Assert._
@@ -153,10 +164,10 @@ class TopologyTest extends JUnitSuite {
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
- builder.stream[String, JLong](userClicksTopic,
Consumed.`with`(Serdes.String, Serdes.JavaLong))
+ builder.stream[String, JLong](userClicksTopic, Consumed.`with`[String,
JLong])
val userRegionsTable: KTableJ[String, String] =
- builder.table[String, String](userRegionsTopic,
Consumed.`with`(Serdes.String, Serdes.String))
+ builder.table[String, String](userRegionsTopic,
Consumed.`with`[String, String])
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] =
userClicksStream
@@ -166,7 +177,7 @@ class TopologyTest extends JUnitSuite {
def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
},
- Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String)
+ Joined.`with`[String, JLong, String]
)
// Change the stream from <user> -> <region, clicks> to <region> ->
<clicks>
@@ -180,7 +191,7 @@ class TopologyTest extends JUnitSuite {
// Compute the total per region by summing the individual click counts
per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
+ .groupByKey(Serialized.`with`[String, JLong])
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 9d821be..bbc84f6 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -29,7 +29,6 @@ import org.junit.rules.TemporaryFolder
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams._
import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster,
IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
new file mode 100644
index 0000000..87b478c
--- /dev/null
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.Serdes
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ConsumedTest extends FlatSpec with Matchers {
+
+ "Create a Consumed" should "create a Consumed with Serdes" in {
+ val consumed: Consumed[String, Long] = Consumed.`with`[String, Long]
+
+ val internalConsumed = new ConsumedInternal(consumed)
+ internalConsumed.keySerde shouldBe Serdes.String
+ internalConsumed.valueSerde shouldBe Serdes.Long
+ }
+
+ "Create a Consumed with timestampExtractor and resetPolicy" should "create a
Consumed with Serdes, timestampExtractor and resetPolicy" in {
+ val timestampExtractor = new FailOnInvalidTimestamp()
+ val resetPolicy = Topology.AutoOffsetReset.LATEST
+ val consumed: Consumed[String, Long] =
+ Consumed.`with`[String, Long](timestampExtractor, resetPolicy)
+
+ val internalConsumed = new ConsumedInternal(consumed)
+ internalConsumed.keySerde shouldBe Serdes.String
+ internalConsumed.valueSerde shouldBe Serdes.Long
+ internalConsumed.timestampExtractor shouldBe timestampExtractor
+ internalConsumed.offsetResetPolicy shouldBe resetPolicy
+ }
+
+ "Create a Consumed with timestampExtractor" should "create a Consumed with
Serdes and timestampExtractor" in {
+ val timestampExtractor = new FailOnInvalidTimestamp()
+ val consumed: Consumed[String, Long] = Consumed.`with`[String,
Long](timestampExtractor)
+
+ val internalConsumed = new ConsumedInternal(consumed)
+ internalConsumed.keySerde shouldBe Serdes.String
+ internalConsumed.valueSerde shouldBe Serdes.Long
+ internalConsumed.timestampExtractor shouldBe timestampExtractor
+ }
+
+ "Create a Consumed with resetPolicy" should "create a Consumed with Serdes
and resetPolicy" in {
+ val resetPolicy = Topology.AutoOffsetReset.LATEST
+ val consumed: Consumed[String, Long] = Consumed.`with`[String,
Long](resetPolicy)
+
+ val internalConsumed = new ConsumedInternal(consumed)
+ internalConsumed.keySerde shouldBe Serdes.String
+ internalConsumed.valueSerde shouldBe Serdes.Long
+ internalConsumed.offsetResetPolicy shouldBe resetPolicy
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
similarity index 55%
copy from
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
copy to
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
index 3197244..f9fcbb5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
@@ -1,4 +1,6 @@
/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,26 +16,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+package org.apache.kafka.streams.scala.kstream
-public class ProducedInternal<K, V> extends Produced<K, V> {
- ProducedInternal(final Produced<K, V> produced) {
- super(produced);
- }
+import org.apache.kafka.streams.scala.Serdes
+import org.apache.kafka.streams.scala.Serdes._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
- public Serde<K> keySerde() {
- return keySerde;
- }
+@RunWith(classOf[JUnitRunner])
+class JoinedTest extends FlatSpec with Matchers {
- public Serde<V> valueSerde() {
- return valueSerde;
- }
+ "Create a Joined" should "create a Joined with Serdes" in {
+ val joined: Joined[String, Long, Int] = Joined.`with`[String, Long, Int]
- public StreamPartitioner<? super K, ? super V> streamPartitioner() {
- return partitioner;
- }
+ joined.keySerde shouldBe Serdes.String
+ joined.valueSerde shouldBe Serdes.Long
+ joined.otherValueSerde shouldBe Serdes.Integer
+ }
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
similarity index 98%
rename from
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
rename to
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 3fdfee6..f339756 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -16,11 +16,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.JoinWindows
-import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.utils.TestDriver
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
similarity index 95%
rename from
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
rename to
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 2e9c821..dc080f1 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -16,12 +16,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.kstream
-import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.utils.TestDriver
+import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
@@ -122,10 +122,7 @@ class KTableTest extends FlatSpec with Matchers with
TestDriver {
val sourceTopic2 = "source2"
val sinkTopic = "sink"
val stateStore = "store"
- val materialized = Materialized
- .as[String, Long, ByteArrayKeyValueStore](stateStore)
- .withKeySerde(Serdes.String)
- .withValueSerde(Serdes.Long)
+ val materialized = Materialized.as[String, Long,
ByteArrayKeyValueStore](stateStore)
val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _)
=> key).count()
val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _)
=> key).count()
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/MaterializedTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/MaterializedTest.scala
new file mode 100644
index 0000000..8d24efe
--- /dev/null
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/MaterializedTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala._
+import org.apache.kafka.streams.state.Stores
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class MaterializedTest extends FlatSpec with Matchers {
+
+ "Create a Materialized" should "create a Materialized with Serdes" in {
+ val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
+ Materialized.`with`[String, Long, ByteArrayKeyValueStore]
+
+ val internalMaterialized = new MaterializedInternal(materialized)
+ internalMaterialized.keySerde shouldBe Serdes.String
+ internalMaterialized.valueSerde shouldBe Serdes.Long
+ }
+
+ "Create a Materialize with a store name" should "create a Materialized with
Serdes and a store name" in {
+ val storeName = "store"
+ val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
+ Materialized.as[String, Long, ByteArrayKeyValueStore](storeName)
+
+ val internalMaterialized = new MaterializedInternal(materialized)
+ internalMaterialized.keySerde shouldBe Serdes.String
+ internalMaterialized.valueSerde shouldBe Serdes.Long
+ internalMaterialized.storeName shouldBe storeName
+ }
+
+ "Create a Materialize with a window store supplier" should "create a
Materialized with Serdes and a store supplier" in {
+ val storeSupplier = Stores.persistentWindowStore("store", 1, 1, true)
+ val materialized: Materialized[String, Long, ByteArrayWindowStore] =
+ Materialized.as[String, Long](storeSupplier)
+
+ val internalMaterialized = new MaterializedInternal(materialized)
+ internalMaterialized.keySerde shouldBe Serdes.String
+ internalMaterialized.valueSerde shouldBe Serdes.Long
+ internalMaterialized.storeSupplier shouldBe storeSupplier
+ }
+
+ "Create a Materialize with a key value store supplier" should "create a
Materialized with Serdes and a store supplier" in {
+ val storeSupplier = Stores.persistentKeyValueStore("store")
+ val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
+ Materialized.as[String, Long](storeSupplier)
+
+ val internalMaterialized = new MaterializedInternal(materialized)
+ internalMaterialized.keySerde shouldBe Serdes.String
+ internalMaterialized.valueSerde shouldBe Serdes.Long
+ internalMaterialized.storeSupplier shouldBe storeSupplier
+ }
+
+ "Create a Materialize with a session store supplier" should "create a
Materialized with Serdes and a store supplier" in {
+ val storeSupplier = Stores.persistentSessionStore("store", 1)
+ val materialized: Materialized[String, Long, ByteArraySessionStore] =
+ Materialized.as[String, Long](storeSupplier)
+
+ val internalMaterialized = new MaterializedInternal(materialized)
+ internalMaterialized.keySerde shouldBe Serdes.String
+ internalMaterialized.valueSerde shouldBe Serdes.Long
+ internalMaterialized.storeSupplier shouldBe storeSupplier
+ }
+}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
new file mode 100644
index 0000000..7a248ab
--- /dev/null
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.streams.kstream.internals.ProducedInternal
+import org.apache.kafka.streams.processor.StreamPartitioner
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.Serdes
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ProducedTest extends FlatSpec with Matchers {
+
+ "Create a Produced" should "create a Produced with Serdes" in {
+ val produced: Produced[String, Long] = Produced.`with`[String, Long]
+
+ val internalProduced = new ProducedInternal(produced)
+ internalProduced.keySerde shouldBe Serdes.String
+ internalProduced.valueSerde shouldBe Serdes.Long
+ }
+
+ "Create a Produced with timestampExtractor and resetPolicy" should "create a
Consumed with Serdes, timestampExtractor and resetPolicy" in {
+ val partitioner = new StreamPartitioner[String, Long] {
+ override def partition(topic: String, key: String, value: Long,
numPartitions: Int): Integer = 0
+ }
+ val produced: Produced[String, Long] = Produced.`with`(partitioner)
+
+ val internalConsumed = new ProducedInternal(produced)
+ internalConsumed.keySerde shouldBe Serdes.String
+ internalConsumed.valueSerde shouldBe Serdes.Long
+ internalConsumed.streamPartitioner shouldBe partitioner
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
similarity index 51%
copy from
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
copy to
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
index 3197244..8c072b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
@@ -1,4 +1,6 @@
/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,26 +16,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+package org.apache.kafka.streams.scala.kstream
-public class ProducedInternal<K, V> extends Produced<K, V> {
- ProducedInternal(final Produced<K, V> produced) {
- super(produced);
- }
+import org.apache.kafka.streams.kstream.internals.SerializedInternal
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.Serdes
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
- public Serde<K> keySerde() {
- return keySerde;
- }
+@RunWith(classOf[JUnitRunner])
+class SerializedTest extends FlatSpec with Matchers {
- public Serde<V> valueSerde() {
- return valueSerde;
- }
+ "Create a Serialized" should "create a Serialized with Serdes" in {
+ val serialized: Serialized[String, Long] = Serialized.`with`[String, Long]
- public StreamPartitioner<? super K, ? super V> streamPartitioner() {
- return partitioner;
- }
+ val internalSerialized = new SerializedInternal(serialized)
+ internalSerialized.keySerde shouldBe Serdes.String
+ internalSerialized.valueSerde shouldBe Serdes.Long
+ }
}