Initial version of Table API Initial version of table API, it includes - Core table API (Table, TableDescriptor, TableSpec) - Local table implementation for in-memory and RocksDb - The writeTo() and stream-table join operators
nickpan47 xinyuiscool prateekm could you help review? Author: Wei Song <[email protected]> Reviewers: Yi Pan <[email protected]>, Christopher Pettitt <[email protected]> Closes #349 from weisong44/table-api-14 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e74998c5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e74998c5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e74998c5 Branch: refs/heads/master Commit: e74998c5e58c891c0744ce1be2b13f7c5861656a Parents: e3efdf5 Author: Wei Song <[email protected]> Authored: Tue Dec 5 13:23:44 2017 -0800 Committer: Prateek Maheshwari <[email protected]> Committed: Tue Dec 5 13:23:44 2017 -0800 ---------------------------------------------------------------------- build.gradle | 9 + .../java/org/apache/samza/operators/KV.java | 2 +- .../apache/samza/operators/MessageStream.java | 51 +++- .../org/apache/samza/operators/StreamGraph.java | 15 + .../apache/samza/operators/TableDescriptor.java | 73 +++++ .../functions/StreamTableJoinFunction.java | 59 ++++ .../table/LocalStoreBackedTableProvider.java | 37 +++ .../org/apache/samza/table/ReadWriteTable.java | 75 +++++ .../org/apache/samza/table/ReadableTable.java | 61 ++++ .../main/java/org/apache/samza/table/Table.java | 31 ++ .../org/apache/samza/table/TableProvider.java | 57 ++++ .../samza/table/TableProviderFactory.java | 35 +++ .../java/org/apache/samza/table/TableSpec.java | 125 ++++++++ .../java/org/apache/samza/task/TaskContext.java | 6 +- .../apache/samza/config/JavaTableConfig.java | 87 ++++++ .../apache/samza/container/TaskContextImpl.java | 24 +- .../samza/execution/ExecutionPlanner.java | 5 + .../org/apache/samza/execution/JobGraph.java | 16 + .../samza/execution/JobGraphJsonGenerator.java | 50 +++ .../org/apache/samza/execution/JobNode.java | 54 +++- .../samza/operators/BaseTableDescriptor.java | 94 ++++++ .../samza/operators/MessageStreamImpl.java | 31 +- .../apache/samza/operators/StreamGraphImpl.java | 40 ++- .../org/apache/samza/operators/TableImpl.java | 40 +++ .../samza/operators/impl/OperatorImplGraph.java | 30 +- .../operators/impl/SendToTableOperatorImpl.java | 71 +++++ .../impl/StreamTableJoinOperatorImpl.java | 82 +++++ .../samza/operators/spec/OperatorSpec.java | 12 +- .../samza/operators/spec/OperatorSpecs.java | 44 ++- .../operators/spec/SendToTableOperatorSpec.java | 65 ++++ .../spec/StreamTableJoinOperatorSpec.java | 67 ++++ .../org/apache/samza/table/TableManager.java | 153 ++++++++++ .../apache/samza/container/SamzaContainer.scala | 75 +++-- .../apache/samza/container/TaskInstance.scala | 40 ++- .../samza/config/TestJavaTableConfig.java | 58 ++++ .../samza/operators/TestMessageStreamImpl.java | 68 ++++- .../samza/operators/TestStreamGraphImpl.java | 25 +- .../impl/TestStreamTableJoinOperatorImpl.java | 101 ++++++ .../apache/samza/table/TestTableManager.java | 176 +++++++++++ .../org/apache/samza/task/TestAsyncRunLoop.java | 14 +- .../kv/inmemory/InMemoryTableDescriptor.java | 59 ++++ .../kv/inmemory/InMemoryTableProvider.java | 65 ++++ .../inmemory/InMemoryTableProviderFactory.java | 33 ++ .../inmemory/TestInMemoryTableDescriptor.java | 48 +++ .../kv/inmemory/TestInMemoryTableProvider.java | 65 ++++ .../storage/kv/RocksDbTableDescriptor.java | 232 ++++++++++++++ .../samza/storage/kv/RocksDbTableProvider.java | 64 ++++ .../storage/kv/RocksDbTableProviderFactory.java | 31 ++ .../storage/kv/TestRocksDbTableDescriptor.java | 87 ++++++ .../storage/kv/TestRocksDbTableProvider.java | 66 ++++ .../kv/BaseLocalStoreBackedTableDescriptor.java | 56 ++++ .../kv/BaseLocalStoreBackedTableProvider.java | 92 ++++++ .../kv/LocalStoreBackedReadWriteTable.java | 68 +++++ .../kv/LocalStoreBackedReadableTable.java | 61 ++++ .../TestLocalBaseStoreBackedTableProvider.java | 77 +++++ .../apache/samza/test/table/TestLocalTable.java | 304 +++++++++++++++++++ .../apache/samza/test/table/TestTableData.java | 200 ++++++++++++ .../samza/test/util/ArraySystemConsumer.java | 4 +- .../samza/test/util/SimpleSystemAdmin.java | 26 +- 59 files changed, 3668 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 50cc5e0..be1baf7 100644 --- a/build.gradle +++ b/build.gradle @@ -565,6 +565,15 @@ project(":samza-kv_$scalaVersion") { project(":samza-kv-inmemory_$scalaVersion") { apply plugin: 'scala' + // Force scala joint compilation + sourceSets.main.scala.srcDir "src/main/java" + sourceSets.test.scala.srcDir "src/test/java" + + // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting + // tasks.compileTestJava.enabled = false + sourceSets.main.java.srcDirs = [] + sourceSets.test.java.srcDirs = [] + dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/KV.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/KV.java b/samza-api/src/main/java/org/apache/samza/operators/KV.java index 0bed3b9..824bcb4 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/KV.java +++ b/samza-api/src/main/java/org/apache/samza/operators/KV.java @@ -25,7 +25,7 @@ package org.apache.samza.operators; * @param <K> type of the key * @param <V> type of the value */ -public class KV<K, V> { +public final class KV<K, V> { public final K key; public final V value; http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index acb2f33..f0a5526 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -18,21 +18,23 @@ */ package org.apache.samza.operators; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Function; + import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.Function; +import org.apache.samza.table.Table; /** @@ -154,6 +156,34 @@ public interface MessageStream<M> { Duration ttl, String id); /** + * Joins this {@link MessageStream} with another {@link Table} using the provided + * pairwise {@link StreamTableJoinFunction}. + * <p> + * The type of input message is expected to be {@link KV}. + * <p> + * Records are looked up from the joined table using the join key, join function + * is applied and join results are emitted as matches are found. + * <p> + * The join function allows implementation of both inner and left outer join. A null will be + * passed to the join function, if no record matching the join key is found in the table. + * The join function can choose to return an instance of JM (outer left join) or null + * (inner join); if null is returned, it won't be processed further. + * <p> + * Both the input stream and table being joined must have the same number of partitions, + * and should be partitioned by the same join key. + * <p> + * + * @param table the table being joined + * @param joinFn the join function + * @param <K> the type of join key + * @param <R> the type of table record + * @param <JM> the type of messages resulting from the {@code joinFn} + * @return the joined {@link MessageStream} + */ + <K, R extends KV, JM> MessageStream<JM> join(Table<R> table, + StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn); + + /** * Merges all {@code otherStreams} with this {@link MessageStream}. * <p> * The merged stream contains messages from all streams in the order they arrive. @@ -235,4 +265,15 @@ public interface MessageStream<M> { */ <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> valueExtractor, String id); + + /** + * Sends messages in this {@link MessageStream} to a {@link Table}. The type of input message is expected + * to be {@link KV}, otherwise a {@link ClassCastException} will be thrown. + * + * @param table the table to write messages to + * @param <K> the type of key in the table + * @param <V> the type of record value in the table + */ + <K, V> void sendTo(Table<KV<K, V>> table); + } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java index 4930631..6871bc7 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -20,6 +20,7 @@ package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.serializers.Serde; +import org.apache.samza.table.Table; /** @@ -121,6 +122,20 @@ public interface StreamGraph { <M> OutputStream<M> getOutputStream(String streamId); /** + * Gets the {@link Table} corresponding to the {@link TableDescriptor}. + * <p> + * Multiple invocations of this method with the same {@link TableDescriptor} will throw an + * {@link IllegalStateException}. + * + * @param tableDesc the {@link TableDescriptor} + * @param <K> the type of the key + * @param <V> the type of the value + * @return the {@link Table} corresponding to the {@code tableDesc} + * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor} + */ + <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc); + + /** * Sets the {@link ContextManager} for this {@link StreamGraph}. * <p> * The provided {@link ContextManager} can be used to setup shared context between the operator functions http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java new file mode 100644 index 0000000..a60b6a9 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.serializers.KVSerde; + +/** + * User facing class to collect metadata that fully describes a + * Samza table. This interface should be implemented by concrete table implementations. + * <p> + * Typical user code should look like the following, notice <code>withConfig()</code> + * is defined in this class and the rest in subclasses. + * + * <pre> + * {@code + * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl") + * .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8"))) + * .withBlockSize(1024) + * .withConfig("some-key", "some-value"); + * } + * </pre> + + * Once constructed, a table descriptor can be registered with the system. Internally, + * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec}, + * which is used to track tables internally. + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + * @param <D> the type of the concrete table descriptor + */ [email protected] +public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> { + + /** + * Get the Id of the table + * @return Id of the table + */ + String getTableId(); + + /** + * Set the Serde for this table + * @param serde the serde + * @return this table descriptor instance + * @throws IllegalArgumentException if null is provided + */ + D withSerde(KVSerde<K, V> serde); + + /** + * Add a configuration entry for the table + * @param key the key + * @param value the value + * @return this table descriptor instance + */ + D withConfig(String key, String value); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java new file mode 100644 index 0000000..6afcf67 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * Joins incoming messages with records from a table by the join key. + * + * @param <K> the type of join key + * @param <M> type of input message + * @param <R> type of the table record + * @param <JM> type of join results + */ [email protected] +public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction { + + /** + * Joins the provided messages and table record, returns the joined message. + * + * @param message the input message + * @param record the table record value + * @return the join result + */ + JM apply(M message, R record); + + /** + * Retrieve the join key from incoming messages + * + * @param message incoming message + * @return the join key + */ + K getMessageKey(M message); + + /** + * Retrieve the join key from table record + * + * @param record table record + * @return the join key + */ + K getRecordKey(R record); +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java new file mode 100644 index 0000000..21630ab --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import org.apache.samza.storage.StorageEngine; + + +/** + * Interface for tables backed by Samza local stores. The backing stores are + * injected during initialization of the table. Since the lifecycle + * of the underlying stores are already managed by Samza container, + * the table provider will not manage the lifecycle of the backing + * stores. + */ +public interface LocalStoreBackedTableProvider extends TableProvider { + /** + * Initializes the table provider with the backing store + * @param store the backing store + */ + void init(StorageEngine store); +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java new file mode 100644 index 0000000..d617153 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.util.List; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.storage.kv.Entry; + +/** + * + * A table that supports get, put and delete by one or more keys + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ [email protected] +public interface ReadWriteTable<K, V> extends ReadableTable<K, V> { + + /** + * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}. + * + * @param key the key with which the specified {@code value} is to be associated. + * @param value the value with which the specified {@code key} is to be associated. + * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}. + */ + void put(K key, V value); + + /** + * Updates the mappings of the specified key-value {@code entries}. + * + * @param entries the updated mappings to put into this table. + * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value. + */ + void putAll(List<Entry<K, V>> entries); + + /** + * Deletes the mapping for the specified {@code key} from this table (if such mapping exists). + * + * @param key the key for which the mapping is to be deleted. + * @throws NullPointerException if the specified {@code key} is {@code null}. + */ + void delete(K key); + + /** + * Deletes the mappings for the specified {@code keys} from this table. + * + * @param keys the keys for which the mappings are to be deleted. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + void deleteAll(List<K> keys); + + + /** + * Flushes the underlying store of this table, if applicable. + */ + void flush(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java new file mode 100644 index 0000000..5ad6e0f --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.util.List; +import java.util.Map; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.KV; + + +/** + * + * A table that supports get by one or more keys + * + * @param <K> the type of the record key in this table + * @param <V> the type of the record value in this table + */ [email protected] +public interface ReadableTable<K, V> extends Table<KV<K, V>> { + + /** + * Gets the value associated with the specified {@code key}. + * + * @param key the key with which the associated value is to be fetched. + * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}. + * @throws NullPointerException if the specified {@code key} is {@code null}. + */ + V get(K key); + + /** + * Gets the values with which the specified {@code keys} are associated. + * + * @param keys the keys with which the associated values are to be fetched. + * @return a map of the keys that were found and their respective values. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + Map<K, V> getAll(List<K> keys); + + /** + * Close the table and release any resources acquired + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/Table.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java b/samza-api/src/main/java/org/apache/samza/table/Table.java new file mode 100644 index 0000000..767e176 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/Table.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import org.apache.samza.annotation.InterfaceStability; + +/** + * + * Marker interface for a table. + * + * @param <R> the type of records in the table + */ [email protected] +public interface Table<R> { +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/TableProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java new file mode 100644 index 0000000..54c6f5d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.util.Map; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * A table provider provides the implementation for a table. It ensures a table is + * properly constructed and also manages its lifecycle. + */ [email protected] +public interface TableProvider { + /** + * Get an instance of the table for read/write operations + * @return the underlying table + */ + Table getTable(); + + /** + * Generate any configuration for this table, the generated configuration + * is used by Samza container to construct this table and any components + * necessary + * . + * @param config the current configuration + * @return configuration for this table + */ + Map<String, String> generateConfig(Map<String, String> config); + + /** + * Start the underlying table + */ + void start(); + + /** + * Stop the underlying table + */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java new file mode 100644 index 0000000..1bb0196 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * Factory of a table provider object + */ [email protected] +public interface TableProviderFactory { + /** + * Constructs an instances of the table provider based on a given table spec + * @param tableSpec the table spec + * @return the table provider + */ + TableProvider getTableProvider(TableSpec tableSpec); +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/TableSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java new file mode 100644 index 0000000..68043f9 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.serializers.KVSerde; + + +/** + * TableSpec is a blueprint for creating, validating, or simply describing a table in the runtime environment. + * + * It is typically created indirectly by constructing an instance of {@link org.apache.samza.operators.TableDescriptor}, + * and then invoke <code>BaseTableDescriptor.getTableSpec()</code>. + * + * It has specific attributes for common behaviors that Samza uses. + * + * It has the table provider factory, which provides the actual table implementation. + * + * It also includes a map of configurations which may be implementation-specific. + * + * It is immutable by design. + */ [email protected] +public class TableSpec { + + private final String id; + private final KVSerde serde; + private final String tableProviderFactoryClassName; + private final Map<String, String> config = new HashMap<>(); + + /** + * Default constructor + */ + public TableSpec() { + this.id = null; + this.serde = null; + this.tableProviderFactoryClassName = null; + } + + /** + * Constructs a {@link TableSpec} + * + * @param tableId Id of the table + * @param tableProviderFactoryClassName table provider factory + * @param serde the serde + * @param config implementation specific configuration + */ + public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, + Map<String, String> config) { + this.id = tableId; + this.serde = serde; + this.tableProviderFactoryClassName = tableProviderFactoryClassName; + this.config.putAll(config); + } + + /** + * Get the Id of the table + * @return Id of the table + */ + public String getId() { + return id; + } + + /** + * Get the serde + * @param <K> the type of the key + * @param <V> the type of the value + * @return the key serde + */ + public <K, V> KVSerde<K, V> getSerde() { + return serde; + } + + /** + * Get the class name of the table provider factory + * @return class name of the table provider factory + */ + public String getTableProviderFactoryClassName() { + return tableProviderFactoryClassName; + } + + /** + * Get implementation configuration for the table + * @return configuration for the table + */ + public Map<String, String> getConfig() { + return Collections.unmodifiableMap(config); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !getClass().equals(o.getClass())) { + return false; + } + return id.equals(((TableSpec) o).id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 4ef3d30..11ffacc 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -19,12 +19,14 @@ package org.apache.samza.task; +import java.util.Set; + import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.table.Table; -import java.util.Set; /** * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during @@ -37,6 +39,8 @@ public interface TaskContext { Object getStore(String name); + Table getTable(String tableId); + TaskName getTaskName(); SamzaContainerContext getSamzaContainerContext(); http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java new file mode 100644 index 0000000..6cc3986 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A helper class for handling table configuration + */ +public class JavaTableConfig extends MapConfig { + + // Prefix + public static final String TABLES_PREFIX = "tables."; + public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s"; + + // Suffix + public static final String TABLE_PROVIDER_FACTORY_SUFFIX = ".provider.factory"; + + // Config keys + public static final String TABLE_PROVIDER_FACTORY = String.format("%s.provider.factory", TABLE_ID_PREFIX); + public static final String TABLE_KEY_SERDE = String.format("%s.key.serde", TABLE_ID_PREFIX); + public static final String TABLE_VALUE_SERDE = String.format("%s.value.serde", TABLE_ID_PREFIX); + + + public JavaTableConfig(Config config) { + super(config); + } + + /** + * Get Id's of all tables + * @return list of table Id's + */ + public List<String> getTableIds() { + Config subConfig = subset(TABLES_PREFIX, true); + Set<String> tableNames = subConfig.keySet().stream() + .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX)) + .map(k -> k.substring(0, k.indexOf("."))) + .collect(Collectors.toSet()); + return new LinkedList<>(tableNames); + } + + /** + * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table + * @param tableId Id of the table + * @return the {@link org.apache.samza.table.TableProviderFactory} class name + */ + public String getTableProviderFactory(String tableId) { + return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null); + } + + /** + * Get registry keys of key serde for this table + * @param tableId Id of the table + * @return serde retistry key + */ + public String getKeySerde(String tableId) { + return get(String.format(TABLE_KEY_SERDE, tableId), null); + } + + /** + * Get registry keys of value serde for this table + * @param tableId Id of the table + * @return serde retistry key + */ + public String getValueSerde(String tableId) { + return get(String.format(TABLE_VALUE_SERDE, tableId), null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index aa622a3..0248486 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -19,7 +19,10 @@ package org.apache.samza.container; -import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.ReadableMetricsRegistry; @@ -27,13 +30,13 @@ import org.apache.samza.storage.TaskStorageManager; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableManager; import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.google.common.collect.ImmutableSet; public class TaskContextImpl implements TaskContext { private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class); @@ -44,6 +47,7 @@ public class TaskContextImpl implements TaskContext { private final Set<SystemStreamPartition> systemStreamPartitions; private final OffsetManager offsetManager; private final TaskStorageManager storageManager; + private final TableManager tableManager; private final JobModel jobModel; private final StreamMetadataCache streamMetadataCache; private final Map<String, Object> objectRegistry = new HashMap<>(); @@ -56,6 +60,7 @@ public class TaskContextImpl implements TaskContext { Set<SystemStreamPartition> systemStreamPartitions, OffsetManager offsetManager, TaskStorageManager storageManager, + TableManager tableManager, JobModel jobModel, StreamMetadataCache streamMetadataCache) { this.taskName = taskName; @@ -64,6 +69,7 @@ public class TaskContextImpl implements TaskContext { this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions); this.offsetManager = offsetManager; this.storageManager = storageManager; + this.tableManager = tableManager; this.jobModel = jobModel; this.streamMetadataCache = streamMetadataCache; } @@ -89,6 +95,16 @@ public class TaskContextImpl implements TaskContext { } @Override + public Table getTable(String tableId) { + if (tableManager != null) { + return tableManager.getTable(tableId); + } else { + LOG.warn("No table manager found"); + return null; + } + } + + @Override public TaskName getTaskName() { return taskName; } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 468aab9..e2c122a 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -39,6 +39,7 @@ import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; +import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,7 @@ public class ExecutionPlanner { Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet()); Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet()); Set<StreamSpec> intStreams = new HashSet<>(sourceStreams); + Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet()); intStreams.retainAll(sinkStreams); sourceStreams.removeAll(intStreams); sinkStreams.removeAll(intStreams); @@ -113,6 +115,9 @@ public class ExecutionPlanner { // add intermediate streams intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node)); + // add tables + tables.forEach(spec -> jobGraph.addTable(spec, node)); + jobGraph.validate(); return jobGraph; http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index 2a09e90..4a09260 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -30,11 +30,13 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; + import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +57,7 @@ import org.slf4j.LoggerFactory; private final Set<StreamEdge> sources = new HashSet<>(); private final Set<StreamEdge> sinks = new HashSet<>(); private final Set<StreamEdge> intermediateStreams = new HashSet<>(); + private final Set<TableSpec> tables = new HashSet<>(); private final Config config; private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator(); @@ -86,6 +89,11 @@ import org.slf4j.LoggerFactory; .collect(Collectors.toList()); } + void addTable(TableSpec tableSpec, JobNode node) { + tables.add(tableSpec); + node.addTable(tableSpec); + } + @Override public String getPlanAsJson() throws Exception { return jsonGenerator.toJson(this); @@ -211,6 +219,14 @@ import org.slf4j.LoggerFactory; } /** + * Return the tables in the graph + * @return unmodifiable set of {@link TableSpec} + */ + Set<TableSpec> getTables() { + return Collections.unmodifiableSet(tables); + } + + /** * Return the intermediate streams in the graph * @return unmodifiable set of {@link StreamEdge} */ http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 03845e3..2729fa3 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -28,12 +28,15 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; + import org.apache.samza.config.ApplicationConfig; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; +import org.apache.samza.table.TableSpec; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; @@ -54,6 +57,15 @@ import org.codehaus.jackson.map.ObjectMapper; int partitionCount; } + static final class TableSpecJson { + @JsonProperty("id") + String id; + @JsonProperty("tableProviderFactory") + String tableProviderFactory; + @JsonProperty("config") + Map<String, String> config; + } + static final class StreamEdgeJson { @JsonProperty("streamSpec") StreamSpecJson streamSpec; @@ -97,6 +109,8 @@ import org.codehaus.jackson.map.ObjectMapper; Map<String, StreamEdgeJson> sinkStreams; @JsonProperty("intermediateStreams") Map<String, StreamEdgeJson> intermediateStreams; + @JsonProperty("tables") + Map<String, TableSpecJson> tables; @JsonProperty("applicationName") String applicationName; @JsonProperty("applicationId") @@ -119,9 +133,11 @@ import org.codehaus.jackson.map.ObjectMapper; jobGraphJson.sourceStreams = new HashMap<>(); jobGraphJson.sinkStreams = new HashMap<>(); jobGraphJson.intermediateStreams = new HashMap<>(); + jobGraphJson.tables = new HashMap<>(); jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams)); jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams)); jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams)); + jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables)); jobGraphJson.jobs = jobGraph.getJobNodes().stream() .map(jobNode -> buildJobNodeJson(jobNode)) @@ -206,6 +222,11 @@ import org.codehaus.jackson.map.ObjectMapper; map.put("outputStreamId", outputStream.getStreamSpec().getId()); } + if (spec instanceof StreamTableJoinOperatorSpec) { + TableSpec tableSpec = ((StreamTableJoinOperatorSpec) spec).getTableSpec(); + map.put("tableId", tableSpec.getId()); + } + if (spec instanceof JoinOperatorSpec) { map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs()); } @@ -247,4 +268,33 @@ import org.codehaus.jackson.map.ObjectMapper; } return edgeJson; } + + /** + * Get or create the JSON POJO for a {@link TableSpec} + * @param tableSpec the {@link TableSpec} + * @param tableSpecs a map of tableId to {@link TableSpecJson} + * @return JSON representation of the {@link TableSpec} + */ + private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) { + String tableId = tableSpec.getId(); + TableSpecJson tableSpecJson = tableSpecs.get(tableId); + if (tableSpecJson == null) { + tableSpecJson = buildTableJson(tableSpec); + tableSpecs.put(tableId, tableSpecJson); + } + return tableSpecJson; + } + + /** + * Create the JSON POJO for a {@link TableSpec} + * @param tableSpec the {@link TableSpec} + * @return JSON representation of the {@link TableSpec} + */ + private TableSpecJson buildTableJson(TableSpec tableSpec) { + TableSpecJson tableSpecJson = new TableSpecJson(); + tableSpecJson.id = tableSpec.getId(); + tableSpecJson.tableProviderFactory = tableSpec.getTableProviderFactoryClassName(); + tableSpecJson.config = tableSpec.getConfig(); + return tableSpecJson; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 2e89292..4e337d9 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -19,7 +19,6 @@ package org.apache.samza.execution; -import com.google.common.base.Joiner; import java.util.ArrayList; import java.util.Base64; import java.util.Collection; @@ -29,7 +28,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; + import org.apache.samza.config.Config; +import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; @@ -47,10 +48,16 @@ import org.apache.samza.operators.util.MathUtils; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableProviderFactory; +import org.apache.samza.table.TableSpec; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; + + /** * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution. @@ -67,6 +74,7 @@ public class JobNode { private final StreamGraphImpl streamGraph; private final List<StreamEdge> inEdges = new ArrayList<>(); private final List<StreamEdge> outEdges = new ArrayList<>(); + private final List<TableSpec> tables = new ArrayList<>(); private final Config config; JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config config) { @@ -109,6 +117,10 @@ public class JobNode { return outEdges; } + void addTable(TableSpec tableSpec) { + tables.add(tableSpec); + } + /** * Generate the configs for a job * @param executionPlanJson JSON representation of the execution plan @@ -147,6 +159,19 @@ public class JobNode { // write serialized serde instances and stream serde configs to configs addSerdeConfigs(configs); + tables.forEach(tableSpec -> { + // Table provider factory + configs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()), + tableSpec.getTableProviderFactoryClassName()); + + // Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs() + + // Generate additional configuration + TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName()); + TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec); + configs.putAll(tableProvider.generateConfig(configs)); + }); + log.info("Job {} has generated configs {}", jobName, configs); String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); @@ -209,11 +234,21 @@ public class JobNode { } }); + // collect all key and msg serde instances for tables + Map<String, Serde> tableKeySerdes = new HashMap<>(); + Map<String, Serde> tableValueSerdes = new HashMap<>(); + tables.forEach(tableSpec -> { + tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde()); + tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde()); + }); + // for each unique stream or store serde instance, generate a unique name and serialize to config HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values()); serdes.addAll(streamMsgSerdes.values()); serdes.addAll(storeKeySerdes.values()); serdes.addAll(storeMsgSerdes.values()); + serdes.addAll(tableKeySerdes.values()); + serdes.addAll(tableValueSerdes.values()); SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); Base64.Encoder base64Encoder = Base64.getEncoder(); Map<Serde, String> serdeUUIDs = new HashMap<>(); @@ -247,6 +282,17 @@ public class JobNode { String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName); configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde)); }); + + // set key and msg serdes for tables to the serde names generated above + tableKeySerdes.forEach((tableId, serde) -> { + String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId); + configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); + }); + + tableValueSerdes.forEach((tableId, serde) -> { + String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId); + configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); + }); } /** @@ -264,10 +310,14 @@ public class JobNode { // Filter out the join operators, and obtain a list of their ttl values List<Long> joinTtlIntervals = operatorSpecs.stream() - .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN) + .filter(spec -> spec instanceof JoinOperatorSpec) .map(spec -> ((JoinOperatorSpec) spec).getTtlMs()) .collect(Collectors.toList()); + if (joinTtlIntervals.isEmpty()) { + return -1; + } + // Combine both the above lists List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals); candidateTimerIntervals.addAll(windowTimerIntervals); http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java new file mode 100644 index 0000000..b875c2e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.table.TableSpec; + + +/** + * Base class for all table descriptor implementations. + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + * @param <D> the type of the concrete table descriptor + */ +abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>> + implements TableDescriptor<K, V, D> { + + protected final String tableId; + + protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde()); + + protected final Map<String, String> config = new HashMap<>(); + + /** + * Constructs a table descriptor instance + * @param tableId Id of the table + */ + protected BaseTableDescriptor(String tableId) { + this.tableId = tableId; + } + + @Override + public D withConfig(String key, String value) { + config.put(key, value); + return (D) this; + } + + @Override + public D withSerde(KVSerde<K, V> serde) { + if (serde == null) { + throw new IllegalArgumentException("Serde cannot be null"); + } + this.serde = serde; + return (D) this; + } + + @Override + public String getTableId() { + return tableId; + } + + /** + * Generate config for {@link TableSpec}; this method is used internally. + * @param tableSpecConfig configuration for the {@link TableSpec} + */ + protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { + tableSpecConfig.putAll(config); + } + + /** + * Validate that this table descriptor is constructed properly; this method is used internally. + */ + protected void validate() { + } + + /** + * Create a {@link TableSpec} from this table descriptor; this method is used internally. + * + * @return the {@link TableSpec} + */ + abstract public TableSpec getTableSpec(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 3f4e40d..07af54f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -19,12 +19,17 @@ package org.apache.samza.operators; +import java.time.Duration; +import java.util.Collection; +import java.util.function.Function; + import org.apache.samza.SamzaException; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; @@ -32,18 +37,18 @@ import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.spec.PartitionByOperatorSpec; +import org.apache.samza.operators.spec.SendToTableOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; - -import java.time.Duration; -import java.util.Collection; -import java.util.function.Function; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; /** @@ -138,6 +143,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override + public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table, + StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) { + TableSpec tableSpec = ((TableImpl) table).getTableSpec(); + StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec( + tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, this.graph.getNextOpId(OpCode.JOIN)); + this.operatorSpec.registerNextOperatorSpec(joinOpSpec); + return new MessageStreamImpl<>(this.graph, joinOpSpec); + } + + @Override public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) { if (otherStreams.isEmpty()) return this; String opId = this.graph.getNextOpId(OpCode.MERGE); @@ -176,4 +191,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> { protected OperatorSpec<?, M> getOperatorSpec() { return this.operatorSpec; } + + @Override + public <K, V> void sendTo(Table<KV<K, V>> table) { + SendToTableOperatorSpec<K, V> op = OperatorSpecs.createSendToTableOperatorSpec( + this.operatorSpec, ((TableImpl) table).getTableSpec(), this.graph.getNextOpId(OpCode.SEND_TO)); + this.operatorSpec.registerNextOperatorSpec(op); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index d014cb9..b607c62 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -18,7 +18,15 @@ */ package org.apache.samza.operators; -import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -34,17 +42,12 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; +import com.google.common.base.Preconditions; /** * A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to @@ -57,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph { // We use a LHM for deterministic order in initializing and closing operators. private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); + private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); private final ApplicationRunner runner; private final Config config; @@ -146,6 +150,18 @@ public class StreamGraphImpl implements StreamGraph { } @Override + public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) { + TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec(); + if (tables.containsKey(tableSpec)) { + throw new IllegalStateException(String.format( + "getTable() invoked multiple times with the same tableId: %s", + tableDesc.getTableId())); + } + tables.put(tableSpec, new TableImpl(tableSpec)); + return tables.get(tableSpec); + } + + @Override public StreamGraph withContextManager(ContextManager contextManager) { this.contextManager = contextManager; return this; @@ -163,7 +179,7 @@ public class StreamGraphImpl implements StreamGraph { */ <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) { StreamSpec streamSpec = runner.getStreamSpec(streamId); - + Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec), "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); @@ -190,6 +206,10 @@ public class StreamGraphImpl implements StreamGraph { return Collections.unmodifiableMap(outputStreams); } + public Map<TableSpec, TableImpl> getTables() { + return Collections.unmodifiableMap(tables); + } + public ContextManager getContextManager() { return this.contextManager; } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java new file mode 100644 index 0000000..e671534 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; + + +/** + * This class is the holder of a {@link TableSpec} + */ +public class TableImpl implements Table { + + private final TableSpec tableSpec; + + public TableImpl(TableSpec tableSpec) { + this.tableSpec = tableSpec; + } + + public TableSpec getTableSpec() { + return tableSpec; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 0bb12d2..ea278c1 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -18,10 +18,15 @@ */ package org.apache.samza.operators.impl; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; + import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.job.model.JobModel; @@ -29,6 +34,7 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; @@ -36,8 +42,9 @@ import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.operators.spec.SendToTableOperatorSpec; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskContext; @@ -45,14 +52,9 @@ import org.apache.samza.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; /** * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s. @@ -212,6 +214,10 @@ public class OperatorImplGraph { return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock); } else if (operatorSpec instanceof JoinOperatorSpec) { return createPartialJoinOperatorImpl(prevOperatorSpec, (JoinOperatorSpec) operatorSpec, config, context, clock); + } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) { + return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) operatorSpec, config, context); + } else if (operatorSpec instanceof SendToTableOperatorSpec) { + return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, config, context); } throw new IllegalArgumentException( String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java new file mode 100644 index 0000000..5ce1328 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.SendToTableOperatorSpec; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation of a send-stream-to-table operator that stores the record + * in the table. + * + * @param <K> the type of the record key + * @param <V> the type of the record value + */ +public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void> { + + private final SendToTableOperatorSpec<K, V> sendToTableOpSpec; + private final ReadWriteTable<K, V> table; + + SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Config config, TaskContext context) { + this.sendToTableOpSpec = sendToTableOpSpec; + this.table = (ReadWriteTable) context.getTable(sendToTableOpSpec.getTableSpec().getId()); + } + + @Override + protected void handleInit(Config config, TaskContext context) { + } + + @Override + protected Collection<Void> handleMessage(KV<K, V> message, MessageCollector collector, TaskCoordinator coordinator) { + table.put(message.getKey(), message.getValue()); + // there should be no further chained operators since this is a terminal operator. + return Collections.emptyList(); + } + + @Override + protected void handleClose() { + table.close(); + } + + @Override + protected OperatorSpec<KV<K, V>, Void> getOperatorSpec() { + return sendToTableOpSpec; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java new file mode 100644 index 0000000..54a5770 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; +import org.apache.samza.table.ReadableTable; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation of a stream-table join operator that first retrieve the value of + * the message key from incoming message, and then apply the join function. + * + * @param <K> type of the join key + * @param <M> type of input messages + * @param <R> type of the table record + * @param <JM> type of the join result + */ +class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> { + + private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec; + private final ReadableTable<K, ?> table; + + StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, + Config config, TaskContext context) { + this.joinOpSpec = joinOpSpec; + this.table = (ReadableTable) context.getTable(joinOpSpec.getTableSpec().getId()); + } + + @Override + protected void handleInit(Config config, TaskContext context) { + this.joinOpSpec.getJoinFn().init(config, context); + } + + @Override + public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + K key = joinOpSpec.getJoinFn().getMessageKey(message); + Object recordValue = table.get(key); + R record = recordValue != null ? (R) KV.of(key, recordValue) : null; + JM output = joinOpSpec.getJoinFn().apply(message, record); + + // The support for inner and outer join will be provided in the jonFn. For inner join, the joinFn might + // return null, when the corresponding record is absent in the table. + return output != null ? + Collections.singletonList(output) + : Collections.emptyList(); + } + + @Override + protected void handleClose() { + this.joinOpSpec.getJoinFn().close(); + } + + protected OperatorSpec<M, JM> getOperatorSpec() { + return joinOpSpec; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 17f1b49..2a5991c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -18,17 +18,17 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.functions.WatermarkFunction; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.MessageStreamImpl; - import java.util.Collection; import java.util.LinkedHashSet; import java.util.Set; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.functions.WatermarkFunction; + /** - * A stream operator specification that holds all the information required to transform + * A stream operator specification that holds all the information required to transform * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce the output * {@link org.apache.samza.operators.MessageStreamImpl}. * http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index 1b3b8aa..c752fe2 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -19,6 +19,10 @@ package org.apache.samza.operators.spec; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Function; + import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.FilterFunction; @@ -26,15 +30,13 @@ import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.TableSpec; import org.apache.samza.task.TaskContext; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.Function; - /** * Factory methods for creating {@link OperatorSpec} instances. @@ -242,4 +244,38 @@ public class OperatorSpecs { }, OperatorSpec.OpCode.MERGE, opId); } + + /** + * Creates a {@link StreamTableJoinOperatorSpec} with a join function. + * + * @param tableSpec the table spec for the table on the right side of the join + * @param joinFn the user-defined join function to get join keys and results + * @param opId the unique ID of the operator + * @param <K> the type of join key + * @param <M> the type of input messages + * @param <R> the type of table record + * @param <JM> the type of the join result + * @return the {@link StreamTableJoinOperatorSpec} + */ + public static <K, M, R, JM> StreamTableJoinOperatorSpec<K, M, R, JM> createStreamTableJoinOperatorSpec( + TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) { + return new StreamTableJoinOperatorSpec(tableSpec, joinFn, opId); + } + + /** + * Creates a {@link SendToTableOperatorSpec} with a key extractor and a value extractor function, + * the type of incoming message is expected to be KV<K, V>. + * + * @param inputOpSpec the operator spec for the input stream + * @param tableSpec the table spec for the underlying table + * @param opId the unique ID of the operator + * @param <K> the type of the table record key + * @param <V> the type of the table record value + * @return the {@link SendToTableOperatorSpec} + */ + public static <K, V> SendToTableOperatorSpec<K, V> createSendToTableOperatorSpec( + OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) { + return new SendToTableOperatorSpec(inputOpSpec, tableSpec, opId); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java new file mode 100644 index 0000000..9084be2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.table.TableSpec; + + +/** + * The spec for operator that writes a stream to a table by extracting keys and values + * from the incoming messages. + * + * @param <K> the type of the table record key + * @param <V> the type of the table record value + */ [email protected] +public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void> { + + private final OperatorSpec<?, KV<K, V>> inputOpSpec; + private final TableSpec tableSpec; + + /** + * Constructor for a {@link SendToTableOperatorSpec}. + * + * @param inputOpSpec the operator spec of the input stream + * @param tableSpec the table spec of the table written to + * @param opId the unique ID for this operator + */ + SendToTableOperatorSpec(OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) { + super(OpCode.SEND_TO, opId); + this.inputOpSpec = inputOpSpec; + this.tableSpec = tableSpec; + } + + public OperatorSpec<?, KV<K, V>> getInputOpSpec() { + return inputOpSpec; + } + + public TableSpec getTableSpec() { + return tableSpec; + } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } +}
