http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java new file mode 100644 index 0000000..730913a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.table.TableSpec; + + +/** + * The spec for stream-table join operator that retrieves a record from the table using key + * derived from the incoming message and joins with the incoming message. + * + * @param <M> the type of input messages + * @param <R> the type of table record + * @param <JM> the type of join result + */ [email protected] +public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM> { + + private final TableSpec tableSpec; + private final StreamTableJoinFunction<K, M, R, JM> joinFn; + + /** + * Constructor for {@link StreamTableJoinOperatorSpec}. + * + * @param tableSpec the table spec for the table on the right side of the join + * @param joinFn the user-defined join function to get join keys and results + * @param opId the unique ID for this operator + */ + StreamTableJoinOperatorSpec(TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) { + super(OpCode.JOIN, opId); + this.tableSpec = tableSpec; + this.joinFn = joinFn; + } + + public TableSpec getTableSpec() { + return tableSpec; + } + + public StreamTableJoinFunction<K, M, R, JM> getJoinFn() { + return this.joinFn; + } + + @Override + public WatermarkFunction getWatermarkFn() { + return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null; + } + +}
http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/table/TableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java new file mode 100644 index 0000000..c3555f3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.storage.StorageEngine; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link TableManager} manages tables within a Samza task. For each table, it maintains + * the {@link TableSpec} and the {@link TableProvider}. It is used at execution for + * {@link org.apache.samza.container.TaskInstance} to retrieve table instances for + * read/write operations. + * + * A {@link TableManager} is constructed from job configuration, the {@link TableSpec} + * and {@link TableProvider} are constructed by processing the job configuration. + * + * After a {@link TableManager} is constructed, local tables are associated with + * local store instances created during {@link org.apache.samza.container.SamzaContainer} + * initialization. + * + * Method {@link TableManager#getTable(String)} will throw {@link IllegalStateException}, + * if it's called before initialization. + * + * For store backed tables, the list of stores must be injected into the constructor. + */ +public class TableManager { + + static public class TableCtx { + private TableSpec tableSpec; + private TableProvider tableProvider; + } + + private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName()); + + // tableId -> TableCtx + private final Map<String, TableCtx> tables = new HashMap<>(); + + private boolean localTablesInitialized; + + /** + * Construct a table manager instance + * @param config the job configuration + * @param serdes Serde instances for tables + */ + public TableManager(Config config, Map<String, Serde<Object>> serdes) { + + new JavaTableConfig(config).getTableIds().forEach(tableId -> { + + // Construct the table provider + String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId)); + + // Construct the KVSerde + JavaTableConfig tableConfig = new JavaTableConfig(config); + KVSerde serde = KVSerde.of( + serdes.get(tableConfig.getKeySerde(tableId)), + serdes.get(tableConfig.getValueSerde(tableId))); + + TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory, + config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + ".")); + + addTable(tableSpec); + + logger.info("Added table " + tableSpec.getId()); + }); + } + + /** + * Initialize all local table + * @param stores stores created locally + */ + public void initLocalTables(Map<String, StorageEngine> stores) { + tables.values().forEach(ctx -> { + if (ctx.tableProvider instanceof LocalStoreBackedTableProvider) { + StorageEngine store = stores.get(ctx.tableSpec.getId()); + if (store == null) { + throw new SamzaException(String.format( + "Backing store for table %s was not injected by SamzaContainer", + ctx.tableSpec.getId())); + } + ((LocalStoreBackedTableProvider) ctx.tableProvider).init(store); + } + }); + + localTablesInitialized = true; + } + + /** + * Add a table to the table manager + * @param tableSpec the table spec + */ + private void addTable(TableSpec tableSpec) { + if (tables.containsKey(tableSpec.getId())) { + throw new SamzaException("Table " + tableSpec.getId() + " already exists"); + } + TableCtx ctx = new TableCtx(); + TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName()); + ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec); + ctx.tableSpec = tableSpec; + tables.put(tableSpec.getId(), ctx); + } + + /** + * Start the table manager, internally it starts all tables + */ + public void start() { + tables.values().forEach(ctx -> ctx.tableProvider.start()); + } + + /** + * Shutdown the table manager, internally it shuts down all tables + */ + public void shutdown() { + tables.values().forEach(ctx -> ctx.tableProvider.stop()); + } + + /** + * Get a table instance + * @param tableId Id of the table + * @return table instance + */ + public Table getTable(String tableId) { + if (!localTablesInitialized) { + throw new IllegalStateException("Local tables in TableManager not initialized."); + } + return tables.get(tableId).tableProvider.getTable(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 412e9dc..f465bfc 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -20,65 +20,37 @@ package org.apache.samza.container import java.io.File +import java.net.{URL, UnknownHostException} import java.nio.file.Path import java.util -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} -import java.net.{URL, UnknownHostException} import java.util.Base64 +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} -import org.apache.samza.{SamzaContainerStatus, SamzaException} import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config._ import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.container.disk.DiskQuotaPolicyFactory -import org.apache.samza.container.disk.DiskSpaceMonitor +import org.apache.samza.config._ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener -import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory -import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor +import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor} import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor} import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory -import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel -import org.apache.samza.metrics.JmxServer -import org.apache.samza.metrics.JvmMetrics -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.serializers.IntermediateMessageSerde -import org.apache.samza.serializers.NoOpSerde -import org.apache.samza.serializers.SerializableSerde -import org.apache.samza.serializers.Serde -import org.apache.samza.serializers.SerdeFactory -import org.apache.samza.serializers.SerdeManager -import org.apache.samza.serializers.StringSerde +import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter} +import org.apache.samza.serializers._ import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.storage.StorageEngineFactory -import org.apache.samza.storage.TaskStorageManager -import org.apache.samza.system.StreamMetadataCache -import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemConsumersMetrics -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemProducers -import org.apache.samza.system.SystemProducersMetrics -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.chooser.DefaultChooser -import org.apache.samza.system.chooser.MessageChooserFactory -import org.apache.samza.system.chooser.RoundRobinChooserFactory +import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager} +import org.apache.samza.system._ +import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory} +import org.apache.samza.table.TableManager import org.apache.samza.task._ -import org.apache.samza.util.HighResolutionClock -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.util.Logging -import org.apache.samza.util.Throttleable -import org.apache.samza.util.MetricsReporterLoader -import org.apache.samza.util.SystemClock -import org.apache.samza.util.Util import org.apache.samza.util.Util.asScalaClock +import org.apache.samza.util._ +import org.apache.samza.{SamzaContainerStatus, SamzaException} import scala.collection.JavaConverters._ @@ -568,6 +540,11 @@ object SamzaContainer extends Logging { new StorageConfig(config).getChangeLogDeleteRetentionsInMs, new SystemClock) + val tableManager = new TableManager(config, serdes.asJava) + tableManager.initLocalTables(taskStores.asJava) + + info("Got table manager"); + val systemStreamPartitions = taskModel .getSystemStreamPartitions .asScala @@ -586,6 +563,7 @@ object SamzaContainer extends Logging { containerContext = containerContext, offsetManager = offsetManager, storageManager = storageManager, + tableManager = tableManager, reporters = reporters, systemStreamPartitions = systemStreamPartitions, exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config), @@ -711,6 +689,7 @@ class SamzaContainer( startOffsetManager startLocalityManager startStores + startTableManager startDiskSpaceMonitor startHostStatisticsMonitor startProducers @@ -745,6 +724,7 @@ class SamzaContainer( shutdownConsumers shutdownTask + shutdownTableManager shutdownStores shutdownDiskSpaceMonitor shutdownHostStatisticsMonitor @@ -885,9 +865,9 @@ class SamzaContainer( } def startStores { - info("Starting task instance stores.") taskInstances.values.foreach(taskInstance => { val startTime = System.currentTimeMillis() + info("Starting stores in task instance %s" format taskInstance.taskName) taskInstance.startStores // Measuring the time to restore the stores val timeToRestore = System.currentTimeMillis() - startTime @@ -898,6 +878,13 @@ class SamzaContainer( }) } + def startTableManager: Unit = { + taskInstances.values.foreach(taskInstance => { + info("Starting table manager in task instance %s" format taskInstance.taskName) + taskInstance.startTableManager + }) + } + def startTask { info("Initializing stream tasks.") @@ -1003,6 +990,12 @@ class SamzaContainer( taskInstances.values.foreach(_.shutdownStores) } + def shutdownTableManager: Unit = { + info("Shutting down task instance table manager.") + + taskInstances.values.foreach(_.shutdownTableManager) + } + def shutdownLocalityManager { if(localityManager != null) { info("Shutting down locality manager.") http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index acec365..f2a5074 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -27,20 +27,9 @@ import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.MetricsReporter import org.apache.samza.storage.TaskStorageManager -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.StreamMetadataCache -import org.apache.samza.system.SystemAdmin -import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.task.AsyncStreamTask -import org.apache.samza.task.ClosableTask -import org.apache.samza.task.EndOfStreamListenerTask -import org.apache.samza.task.InitableTask -import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.task.StreamTask -import org.apache.samza.task.TaskCallbackFactory -import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.task.WindowableTask +import org.apache.samza.system._ +import org.apache.samza.table.TableManager +import org.apache.samza.task._ import org.apache.samza.util.Logging import scala.collection.JavaConverters._ @@ -56,6 +45,7 @@ class TaskInstance( containerContext: SamzaContainerContext, val offsetManager: OffsetManager = new OffsetManager, storageManager: TaskStorageManager = null, + tableManager: TableManager = null, reporters: Map[String, MetricsReporter] = Map(), val systemStreamPartitions: Set[SystemStreamPartition] = Set(), val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler, @@ -68,7 +58,7 @@ class TaskInstance( val isAsyncTask = task.isInstanceOf[AsyncStreamTask] val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager, - storageManager, jobModel, streamMetadataCache) + storageManager, tableManager, jobModel, streamMetadataCache) // store the (ssp -> if this ssp is catched up) mapping. "catched up" // means the same ssp in other taskInstances have the same offset as @@ -101,6 +91,16 @@ class TaskInstance( } } + def startTableManager { + if (tableManager != null) { + debug("Starting table manager for taskName: %s" format taskName) + + tableManager.start + } else { + debug("Skipping table manager initialization for taskName: %s" format taskName) + } + } + def initTask { if (isInitableTask) { debug("Initializing task for taskName: %s" format taskName) @@ -225,6 +225,16 @@ class TaskInstance( } } + def shutdownTableManager { + if (tableManager != null) { + debug("Shutting down table manager for taskName: %s" format taskName) + + tableManager.shutdown + } else { + debug("Skipping table manager shutdown for taskName: %s" format taskName) + } + } + override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s endofstreamlistener=%s]" format http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java new file mode 100644 index 0000000..2775ca7 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.junit.Test; + +import com.google.common.collect.Sets; + +import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + + +public class TestJavaTableConfig { + @Test + public void testGetTableIds() { + Set<String> ids = Sets.newHashSet("t1", "t2"); + Map<String, String> map = ids.stream() + .map(id -> String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, id)) + .collect(Collectors.toMap(key -> key, key -> key + "-provider-factory")); + JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map)); + + assertEquals(2, tableConfig.getTableIds().size()); + + ids.removeAll(tableConfig.getTableIds()); + assertTrue(ids.isEmpty()); + } + + @Test + public void testGetTableProperties() { + Map<String, String> map = new HashMap<>(); + map.put("tables.t1.spec", "t1-spec"); + map.put("tables.t1.provider.factory", "t1-provider-factory"); + JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map)); + assertEquals("t1-provider-factory", tableConfig.getTableProviderFactory("t1")); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index d97d494..96e234e 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -18,7 +18,12 @@ */ package org.apache.samza.operators; -import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; +import java.util.function.Supplier; + import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; @@ -27,14 +32,17 @@ import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.spec.PartitionByOperatorSpec; +import org.apache.samza.operators.spec.SendToTableOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.operators.windows.Window; @@ -42,14 +50,11 @@ import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; +import org.apache.samza.table.TableSpec; import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.function.Function; -import java.util.function.Supplier; +import com.google.common.collect.ImmutableList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -313,6 +318,57 @@ public class TestMessageStreamImpl { } @Test + public void testSendToTable() { + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + OperatorSpec inputOpSpec = mock(OperatorSpec.class); + MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec); + + TableSpec tableSpec = new TableSpec(); + TableImpl table = new TableImpl(tableSpec); + + source.sendTo(table); + + ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); + verify(inputOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); + OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue(); + + assertTrue(registeredOpSpec instanceof SendToTableOperatorSpec); + SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) registeredOpSpec; + + assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode()); + assertEquals(inputOpSpec, sendToTableOperatorSpec.getInputOpSpec()); + assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec()); + } + + @Test + public void testStreamTableJoin() { + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + OperatorSpec leftInputOpSpec = mock(OperatorSpec.class); + MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec); + OperatorSpec rightInputOpSpec = mock(OperatorSpec.class); + MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec); + + TableSpec tableSpec = new TableSpec(); + TableImpl table = new TableImpl(tableSpec); + + source2.sendTo(table); + + StreamTableJoinFunction<String, KV<String, TestMessageEnvelope>, KV<String, TestMessageEnvelope>, TestOutputMessageEnvelope> + mockJoinFn = mock(StreamTableJoinFunction.class); + source1.join(table, mockJoinFn); + + ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); + verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture()); + OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = leftRegisteredOpCaptor.getValue(); + + assertTrue(leftRegisteredOpSpec instanceof StreamTableJoinOperatorSpec); + StreamTableJoinOperatorSpec joinOpSpec = (StreamTableJoinOperatorSpec) leftRegisteredOpSpec; + assertEquals(OpCode.JOIN, joinOpSpec.getOpCode()); + assertEquals(mockJoinFn, joinOpSpec.getJoinFn()); + assertEquals(tableSpec, joinOpSpec.getTableSpec()); + } + + @Test public void testMerge() { StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); OperatorSpec mockOpSpec1 = mock(OperatorSpec.class); http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java index cf0a198..3bb44b5 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java @@ -12,14 +12,16 @@ * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for THE * specific language governing permissions and limitations * under the License. */ package org.apache.samza.operators; -import com.google.common.collect.ImmutableList; -import junit.framework.Assert; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -33,10 +35,11 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.TableSpec; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; +import com.google.common.collect.ImmutableList; +import junit.framework.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -583,4 +586,16 @@ public class TestStreamGraphImpl { Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2); Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3); } + + @Test + public void testGetTable() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); + when(mockTableDescriptor.getTableSpec()).thenReturn( + new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>())); + Assert.assertNotNull(graph.getTable(mockTableDescriptor)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java new file mode 100644 index 0000000..d8b2e8d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import java.util.Collection; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; +import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.TableSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import junit.framework.Assert; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestStreamTableJoinOperatorImpl { + @Test + public void testHandleMessage() { + + String tableId = "t1"; + TableSpec tableSpec = mock(TableSpec.class); + when(tableSpec.getId()).thenReturn(tableId); + + StreamTableJoinOperatorSpec mockJoinOpSpec = mock(StreamTableJoinOperatorSpec.class); + when(mockJoinOpSpec.getTableSpec()).thenReturn(tableSpec); + when(mockJoinOpSpec.getJoinFn()).thenReturn( + new StreamTableJoinFunction<String, KV<String, String>, KV<String, String>, String>() { + @Override + public String apply(KV<String, String> message, KV<String, String> record) { + if ("1".equals(message.getKey())) { + Assert.assertEquals("m1", message.getValue()); + Assert.assertEquals("r1", record.getValue()); + return "m1r1"; + } else if ("2".equals(message.getKey())) { + Assert.assertEquals("m2", message.getValue()); + Assert.assertNull(record); + return null; + } + throw new SamzaException("Should never reach here!"); + } + + @Override + public String getMessageKey(KV<String, String> message) { + return message.getKey(); + } + + @Override + public String getRecordKey(KV<String, String> record) { + return record.getKey(); + } + }); + Config config = mock(Config.class); + ReadableTable table = mock(ReadableTable.class); + when(table.get("1")).thenReturn("r1"); + when(table.get("2")).thenReturn(null); + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getTable(tableId)).thenReturn(table); + + MessageCollector mockMessageCollector = mock(MessageCollector.class); + TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class); + + StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl( + mockJoinOpSpec, config, mockTaskContext); + + // Table has the key + Collection<TestMessageEnvelope> result; + result = streamTableJoinOperator.handleMessage(KV.of("1", "m1"), mockMessageCollector, mockTaskCoordinator); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("m1r1", result.iterator().next()); + // Table doesn't have the key + result = streamTableJoinOperator.handleMessage(KV.of("2", "m2"), mockMessageCollector, mockTaskCoordinator); + Assert.assertEquals(0, result.size()); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java new file mode 100644 index 0000000..df5b9e5 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.lang.reflect.Field; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.SerializerConfig; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerializableSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.StorageEngine; +import org.junit.Test; + +import junit.framework.Assert; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestTableManager { + + private static final String TABLE_ID = "t1"; + + public static class DummyTableProviderFactory implements TableProviderFactory { + + static Table table; + static LocalStoreBackedTableProvider tableProvider; + + @Override + public TableProvider getTableProvider(TableSpec tableSpec) { + table = mock(Table.class); + tableProvider = mock(LocalStoreBackedTableProvider.class); + when(tableProvider.getTable()).thenReturn(table); + return tableProvider; + } + } + + @Test + public void testInitByConfig() { + Map<String, String> map = new HashMap<>(); + map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName()); + map.put(String.format("tables.%s.some.config", TABLE_ID), "xyz"); + addKeySerde(map); + addValueSerde(map); + doTestInit(map); + } + + @Test(expected = Exception.class) + public void testInitFailsWithoutProviderFactory() { + Map<String, String> map = new HashMap<>(); + addKeySerde(map); + addValueSerde(map); + doTestInit(map); + } + + @Test(expected = Exception.class) + public void testInitFailsWithoutKeySerde() { + Map<String, String> map = new HashMap<>(); + map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName()); + addValueSerde(map); + doTestInit(map); + } + + @Test(expected = Exception.class) + public void testInitFailsWithoutValueSerde() { + Map<String, String> map = new HashMap<>(); + map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName()); + addValueSerde(map); + doTestInit(map); + } + + @Test(expected = IllegalStateException.class) + public void testInitFailsWithoutInitializingLocalTables() { + TableManager tableManager = new TableManager(new MapConfig(new HashMap<>()), new HashMap<>()); + tableManager.getTable("dummy"); + } + + private void doTestInit(Map<String, String> map) { + Map<String, StorageEngine> storageEngines = new HashMap<>(); + storageEngines.put(TABLE_ID, mock(StorageEngine.class)); + + Map<String, Serde<Object>> serdeMap = new HashMap<>(); + SerializableSerde<Serde> serializableSerde = new SerializableSerde(); + map.keySet().stream() + .filter(k -> k.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX())) + .forEach(k -> { + String serdeName = k + .replace(String.format(SerializerConfig.SERIALIZER_PREFIX(), ""), "") + .replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""); + String serializedSerde = map.get(k); + byte[] bytes = Base64.getDecoder().decode(serializedSerde); + Serde serde = serializableSerde.fromBytes(bytes); + serdeMap.put(serdeName, serde); + }); + + TableManager tableManager = new TableManager(new MapConfig(map), serdeMap); + tableManager.initLocalTables(storageEngines); + + Table table = tableManager.getTable(TABLE_ID); + verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject()); + Assert.assertEquals(DummyTableProviderFactory.table, table); + + Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables"); + TableManager.TableCtx ctx = ctxMap.get(TABLE_ID); + + TableSpec tableSpec = getFieldValue(ctx, "tableSpec"); + Assert.assertEquals(TABLE_ID, tableSpec.getId()); + Assert.assertEquals(DummyTableProviderFactory.class.getName(), tableSpec.getTableProviderFactoryClassName()); + Assert.assertEquals(IntegerSerde.class, tableSpec.getSerde().getKeySerde().getClass()); + Assert.assertEquals(StringSerde.class, tableSpec.getSerde().getValueSerde().getClass()); + Assert.assertEquals("xyz", tableSpec.getConfig().get("some.config")); + + TableProvider tableProvider = getFieldValue(ctx, "tableProvider"); + Assert.assertNotNull(tableProvider); + } + + private void addKeySerde(Map<String, String> map) { + String serdeId = "key-serde"; + map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId), + serializeSerde(new IntegerSerde())); + map.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, TABLE_ID), serdeId); + } + + private void addValueSerde(Map<String, String> map) { + String serdeId = "value-serde"; + map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId), + serializeSerde(new StringSerde("UTF-8"))); + map.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, TABLE_ID), serdeId); + } + + private String serializeSerde(Serde serde) { + return Base64.getEncoder().encodeToString(new SerializableSerde().toBytes(serde)); + } + + private <T> T getFieldValue(Object object, String fieldName) { + Field field = null; + try { + field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(object); + } catch (NoSuchFieldException | IllegalAccessException ex) { + throw new SamzaException(ex); + } finally { + if (field != null) { + field.setAccessible(false); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index b399f5f..28a4f8b 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.samza.Partition; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.config.Config; @@ -47,16 +48,15 @@ import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.TestSystemConsumers; import org.junit.Before; import org.junit.Test; + import scala.Option; import scala.collection.JavaConverters; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.*; // TODO(spvenkat) SAMZA-1183: Fix all commented out tests. public class TestAsyncRunLoop { @@ -86,7 +86,7 @@ public class TestAsyncRunLoop { scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet(); return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics, null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), - manager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, null); + manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, null); } TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) { http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java new file mode 100644 index 0000000..2681fb3 --- /dev/null +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv.inmemory; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor; +import org.apache.samza.table.TableSpec; + + +/** + * Table descriptor for in-memory tables + * + * @param <K> the type of the key + * @param <V> the type of the value + */ +public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> { + + public InMemoryTableDescriptor(String tableId) { + super(tableId); + } + + @Override + protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { + super.generateTableSpecConfig(tableSpecConfig); + } + + @Override + public TableSpec getTableSpec() { + + validate(); + + Map<String, String> tableSpecConfig = new HashMap<>(); + generateTableSpecConfig(tableSpecConfig); + + return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig); + } + + private void addInMemoryConfig(Map<String, String> map, String key, String value) { + map.put("inmemory." + key, value); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java new file mode 100644 index 0000000..c1c2f1c --- /dev/null +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv.inmemory; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.storage.kv.BaseLocalStoreBackedTableProvider; +import org.apache.samza.table.TableSpec; + + +/** + * Table provider of an in-memory table + */ +public class InMemoryTableProvider extends BaseLocalStoreBackedTableProvider { + + public InMemoryTableProvider(TableSpec tableSpec) { + super(tableSpec); + } + + @Override + public Map<String, String> generateConfig(Map<String, String> config) { + + Map<String, String> tableConfig = new HashMap<>(); + + // Store factory configuration + tableConfig.put(String.format( + StorageConfig.FACTORY(), tableSpec.getId()), + InMemoryKeyValueStorageEngineFactory.class.getName()); + + // Common store configuration + tableConfig.putAll(generateCommonStoreConfig(config)); + + // Rest of the configuration + tableSpec.getConfig().forEach((k, v) -> { + String realKey = k.startsWith("inmemory.") ? + String.format("stores.%s", tableSpec.getId()) + "." + k.substring("inmemory.".length()) + : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; + tableConfig.put(realKey, v); + }); + + logger.info("Generated configuration for table " + tableSpec.getId()); + + return tableConfig; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java new file mode 100644 index 0000000..f05982a --- /dev/null +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv.inmemory; + +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableProviderFactory; +import org.apache.samza.table.TableSpec; + +/** + * Factory class for an in-memory table provider + */ +public class InMemoryTableProviderFactory implements TableProviderFactory { + @Override + public TableProvider getTableProvider(TableSpec tableSpec) { + return new InMemoryTableProvider(tableSpec); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java new file mode 100644 index 0000000..840fb70 --- /dev/null +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv.inmemory; + +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.table.TableSpec; +import org.junit.Assert; +import org.junit.Test; + + +public class TestInMemoryTableDescriptor { + @Test + public void testTableSpec() { + + TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1") + .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + .withConfig("inmemory.abc", "xyz") + .getTableSpec(); + + Assert.assertNotNull(tableSpec.getSerde()); + Assert.assertNotNull(tableSpec.getSerde().getKeySerde()); + Assert.assertNotNull(tableSpec.getSerde().getValueSerde()); + Assert.assertEquals("xyz", getConfig(tableSpec, "abc")); + } + + private String getConfig(TableSpec tableSpec, String key) { + return tableSpec.getConfig().get("inmemory." + key); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java new file mode 100644 index 0000000..76b7a66 --- /dev/null +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv.inmemory; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableSpec; +import org.junit.Test; + +import junit.framework.Assert; + + +public class TestInMemoryTableProvider { + @Test + public void testGenerateConfig() { + Map<String, String> tableSpecConfig = new HashMap<>(); + tableSpecConfig.put("inmemory.c1", "c1-value"); + tableSpecConfig.put("inmemory.c2", "c2-value"); + tableSpecConfig.put("c3", "c3-value"); + tableSpecConfig.put("c4", "c4-value"); + + TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()), + "my-table-provider-factory", tableSpecConfig); + + Map<String, String> config = new HashMap<>(); + config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); + config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); + + TableProvider tableProvider = new InMemoryTableProvider(tableSpec); + Map<String, String> tableConfig = tableProvider.generateConfig(config); + + Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); + Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); + Assert.assertEquals( + InMemoryKeyValueStorageEngineFactory.class.getName(), + tableConfig.get(String.format(StorageConfig.FACTORY(), "t1"))); + Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1")); + Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2")); + Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3")); + Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java new file mode 100644 index 0000000..2c62159 --- /dev/null +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.table.TableSpec; + + +/** + * Table descriptor for RocksDb backed tables + * + * @param <K> the type of the key + * @param <V> the type of the value + */ +public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> { + + static final public String WRITE_BATCH_SIZE = "write.batch.size"; + static final public String OBJECT_CACHE_SIZE = "object.cache.size"; + static final public String CONTAINER_CACHE_SIZE_BYTES = "container.cache.size.bytes"; + static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = "container.write.buffer.size.bytes"; + static final public String ROCKSDB_COMPRESSION = "rocksdb.compression"; + static final public String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes"; + static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms"; + static final public String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style"; + static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers"; + static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes"; + static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num"; + + protected Integer writeBatchSize; + protected Integer objectCacheSize; + private Integer cacheSize; + private Integer writeBufferSize; + private Integer blockSize; + private Integer ttl; + private Integer numWriteBuffers; + private Integer maxLogFileSize; + private Integer numLogFilesToKeep; + private String compressionType; + private String compactionStyle; + + public RocksDbTableDescriptor(String tableId) { + super(tableId); + } + + /** + * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide + * @param writeBatchSize write batch size + * @return this table descriptor instance + */ + public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) { + this.writeBatchSize = writeBatchSize; + return this; + } + + /** + * Refer to <code>stores.store-name.object.cache.size</code> in Samza configuration guide + * @param objectCacheSize the object cache size + * @return this table descriptor instance + */ + public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) { + this.objectCacheSize = objectCacheSize; + return this; + } + + /** + * Refer to <code>stores.store-name.container.cache.size.bytes</code> in Samza configuration guide + * @param cacheSize the cache size in bytes + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + return this; + } + + /** + * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> in Samza configuration guide + * @param writeBufferSize the write buffer size in bytes + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza configuration guide + * @param compressionType the compression type + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withCompressionType(String compressionType) { + this.compressionType = compressionType; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza configuration guide + * @param blockSize the block size in bytes + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) { + this.blockSize = blockSize; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza configuration guide + * @param ttl the time to live in milliseconds + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withTtl(int ttl) { + this.ttl = ttl; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza configuration guide + * @param compactionStyle the compaction style + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withCompactionStyle(String compactionStyle) { + this.compactionStyle = compactionStyle; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide + * @param numWriteBuffers the number of write buffers + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) { + this.numWriteBuffers = numWriteBuffers; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> in Samza configuration guide + * @param maxLogFileSize the maximal log file size in bytes + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) { + this.maxLogFileSize = maxLogFileSize; + return this; + } + + /** + * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide + * @param numLogFilesToKeep the number of log files to keep + * @return this table descriptor instance + */ + public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int numLogFilesToKeep) { + this.numLogFilesToKeep = numLogFilesToKeep; + return this; + } + + /** + * Create a table spec based on this table description + * @return the table spec + */ + @Override + public TableSpec getTableSpec() { + + validate(); + + Map<String, String> tableSpecConfig = new HashMap<>(); + generateTableSpecConfig(tableSpecConfig); + + return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig); + } + + @Override + protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { + + super.generateTableSpecConfig(tableSpecConfig); + + if (writeBatchSize != null) { + addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, writeBatchSize.toString()); + } + if (objectCacheSize != null) { + addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, objectCacheSize.toString()); + } + if (cacheSize != null) { + addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, cacheSize.toString()); + } + if (writeBufferSize != null) { + addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, writeBufferSize.toString()); + } + if (compressionType != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType); + } + if (blockSize != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, blockSize.toString()); + } + if (ttl != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString()); + } + if (compactionStyle != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, compactionStyle); + } + if (numWriteBuffers != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, numWriteBuffers.toString()); + } + if (maxLogFileSize != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, maxLogFileSize.toString()); + } + if (numLogFilesToKeep != null) { + addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString()); + } + } + + private void addRocksDbConfig(Map<String, String> map, String key, String value) { + map.put("rocksdb." + key, value); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java new file mode 100644 index 0000000..eb8188f --- /dev/null +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.table.TableSpec; + + +/** + * Table provider for tables backed by RocksDb. + */ +public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider { + + public RocksDbTableProvider(TableSpec tableSpec) { + super(tableSpec); + } + + @Override + public Map<String, String> generateConfig(Map<String, String> config) { + + Map<String, String> tableConfig = new HashMap<>(); + + // Store factory configuration + tableConfig.put(String.format( + StorageConfig.FACTORY(), tableSpec.getId()), + RocksDbKeyValueStorageEngineFactory.class.getName()); + + // Common store configuration + tableConfig.putAll(generateCommonStoreConfig(config)); + + // Rest of the configuration + tableSpec.getConfig().forEach((k, v) -> { + String realKey = k.startsWith("rocksdb.") ? + String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length()) + : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; + tableConfig.put(realKey, v); + }); + + logger.info("Generated configuration for table " + tableSpec.getId()); + + return tableConfig; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java new file mode 100644 index 0000000..dbe0f97 --- /dev/null +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableProviderFactory; +import org.apache.samza.table.TableSpec; + + +public class RocksDbTableProviderFactory implements TableProviderFactory { + @Override + public TableProvider getTableProvider(TableSpec tableSpec) { + return new RocksDbTableProvider(tableSpec); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java new file mode 100644 index 0000000..49fe6eb --- /dev/null +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.table.TableSpec; +import org.junit.Test; + +import junit.framework.Assert; + + +public class TestRocksDbTableDescriptor { + + @Test + public void testMinimal() { + new RocksDbTableDescriptor<Integer, String>("1") + .validate(); + } + + @Test + public void testSerde() { + TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1") + .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + .getTableSpec(); + Assert.assertNotNull(tableSpec.getSerde()); + Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class); + Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class); + } + + @Test + public void testTableSpec() { + + TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1") + .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + .withBlockSize(1) + .withCacheSize(2) + .withCompactionStyle("fifo") + .withCompressionType("snappy") + .withMaxLogFileSize(3) + .withNumLogFilesToKeep(4) + .withNumWriteBuffers(5) + .withObjectCacheSize(6) + .withTtl(7) + .withWriteBatchSize(8) + .withWriteBufferSize(9) + .withConfig("rocksdb.abc", "xyz") + .getTableSpec(); + + Assert.assertNotNull(tableSpec.getSerde()); + Assert.assertNotNull(tableSpec.getSerde().getKeySerde()); + Assert.assertNotNull(tableSpec.getSerde().getValueSerde()); + Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES)); + Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES)); + Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES)); + Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM)); + Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS)); + Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE)); + Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS)); + Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE)); + Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES)); + Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION)); + Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE)); + Assert.assertEquals("xyz", getConfig(tableSpec, "abc")); + } + + private String getConfig(TableSpec tableSpec, String key) { + return tableSpec.getConfig().get("rocksdb." + key); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java new file mode 100644 index 0000000..beda5da --- /dev/null +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableSpec; +import org.junit.Test; + +import junit.framework.Assert; + + +public class TestRocksDbTableProvider { + @Test + public void testGenerateConfig() { + + Map<String, String> tableSpecConfig = new HashMap<>(); + tableSpecConfig.put("rocksdb.c1", "c1-value"); + tableSpecConfig.put("rocksdb.c2", "c2-value"); + tableSpecConfig.put("c3", "c3-value"); + tableSpecConfig.put("c4", "c4-value"); + + TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()), + "my-table-provider-factory", tableSpecConfig); + + Map<String, String> config = new HashMap<>(); + config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); + config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); + + TableProvider tableProvider = new RocksDbTableProvider(tableSpec); + Map<String, String> tableConfig = tableProvider.generateConfig(config); + + Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); + Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); + Assert.assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + tableConfig.get(String.format(StorageConfig.FACTORY(), "t1"))); + Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1")); + Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2")); + Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3")); + Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java new file mode 100644 index 0000000..1f9b57b --- /dev/null +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.Map; + +import org.apache.samza.operators.BaseTableDescriptor; + + +/** + * Table descriptor for store backed tables. + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + * @param <D> the type of the concrete table descriptor + */ +abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>> + extends BaseTableDescriptor<K, V, D> { + + /** + * Constructs a table descriptor instance + * @param tableId Id of the table + */ + public BaseLocalStoreBackedTableDescriptor(String tableId) { + super(tableId); + } + + @Override + protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { + super.generateTableSpecConfig(tableSpecConfig); + } + + /** + * Validate that this table descriptor is constructed properly + */ + protected void validate() { + super.validate(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java new file mode 100644 index 0000000..4af0f1d --- /dev/null +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.storage.StorageEngine; +import org.apache.samza.table.LocalStoreBackedTableProvider; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base class for tables backed by Samza stores, see {@link LocalStoreBackedTableProvider}. + */ +abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBackedTableProvider { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected final TableSpec tableSpec; + + protected KeyValueStore kvStore; + + public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) { + this.tableSpec = tableSpec; + } + + @Override + public void init(StorageEngine store) { + kvStore = (KeyValueStore) store; + logger.info("Initialized backing store for table " + tableSpec.getId()); + } + + @Override + public Table getTable() { + if (kvStore == null) { + throw new SamzaException("Store not initialized for table " + tableSpec.getId()); + } + return new LocalStoreBackedReadWriteTable(kvStore); + } + + @Override + public void start() { + logger.info("Starting table provider for table " + tableSpec.getId()); + } + + @Override + public void stop() { + logger.info("Stopping table provider for table " + tableSpec.getId()); + } + + protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) { + + Map<String, String> storeConfig = new HashMap<>(); + + // We assume the configuration for serde are already generated for this table, + // so we simply carry them over to store configuration. + // + JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(config)); + + String keySerde = tableConfig.getKeySerde(tableSpec.getId()); + storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde); + + String valueSerde = tableConfig.getValueSerde(tableSpec.getId()); + storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde); + + return storeConfig; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java new file mode 100644 index 0000000..3149c86 --- /dev/null +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.List; + +import org.apache.samza.table.ReadWriteTable; + + +/** + * A store backed readable and writable table + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadableTable<K, V> + implements ReadWriteTable<K, V> { + + /** + * Constructs an instance of {@link LocalStoreBackedReadWriteTable} + * @param kvStore the backing store + */ + public LocalStoreBackedReadWriteTable(KeyValueStore kvStore) { + super(kvStore); + } + + @Override + public void put(K key, V value) { + kvStore.put(key, value); + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + entries.forEach(e -> kvStore.put(e.getKey(), e.getValue())); + } + + @Override + public void delete(K key) { + kvStore.delete(key); + } + + @Override + public void deleteAll(List<K> keys) { + keys.forEach(k -> kvStore.delete(k)); + } + + @Override + public void flush() { + kvStore.flush(); + } + +}
