http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index 93a0521..8b6bc1a 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -21,8 +21,7 @@ package org.apache.samza.table.remote; import com.google.common.base.Preconditions; import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.context.Context; -import org.apache.samza.table.Table; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.descriptors.RemoteTableDescriptor; import org.apache.samza.table.retry.RetriableReadFunction; import org.apache.samza.table.retry.RetriableWriteFunction; @@ -45,9 +44,7 @@ import java.util.concurrent.ScheduledExecutorService; */ public class RemoteTableProvider extends BaseTableProvider { - private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>(); - - private boolean readOnly; + private final List<RemoteTable<?, ?>> tables = new ArrayList<>(); /** * Map of tableId -> executor service for async table IO and callbacks. The same executors @@ -63,21 +60,13 @@ public class RemoteTableProvider extends BaseTableProvider { } @Override - public void init(Context context) { - super.init(context); - JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig()); - this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null; - } - - @Override - public Table getTable() { + public ReadWriteTable getTable() { Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId)); - RemoteReadableTable table; - JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig()); + // Read part TableReadFunction readFn = getReadFn(tableConfig); RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER); if (rateLimiter != null) { @@ -86,34 +75,29 @@ public class RemoteTableProvider extends BaseTableProvider { TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN); TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG); - TableRateLimiter.CreditFunction<?, ?> writeCreditFn; - TableRateLimiter writeRateLimiter = null; - TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY); - TableRetryPolicy writeRetryPolicy = null; - - if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) { - retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { - Thread thread = new Thread(runnable); - thread.setName("table-retry-executor"); - thread.setDaemon(true); - return thread; - }); - } - if (readRetryPolicy != null) { + if (retryExecutor == null) { + retryExecutor = createRetryExecutor(); + } readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor); } - TableWriteFunction writeFn = getWriteFn(tableConfig); - boolean isRateLimited = readRateLimiter.isRateLimited(); - if (!readOnly) { - writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN); + + // Write part + TableWriteFunction writeFn = getWriteFn(tableConfig); + TableRateLimiter writeRateLimiter = null; + TableRetryPolicy writeRetryPolicy = null; + if (writeFn != null) { + TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN); writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG); isRateLimited |= writeRateLimiter.isRateLimited(); writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY); if (writeRetryPolicy != null) { + if (retryExecutor == null) { + retryExecutor = createRetryExecutor(); + } writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor); } } @@ -140,13 +124,8 @@ public class RemoteTableProvider extends BaseTableProvider { })); } - if (readOnly) { - table = new RemoteReadableTable(tableId, readFn, readRateLimiter, - tableExecutors.get(tableId), callbackExecutors.get(tableId)); - } else { - table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter, - writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); - } + RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter, + writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId); if (readRetryPolicy != null) { @@ -192,5 +171,14 @@ public class RemoteTableProvider extends BaseTableProvider { } return writeFn; } + + private ScheduledExecutorService createRetryExecutor() { + return Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-retry-executor"); + thread.setDaemon(true); + return thread; + }); + } }
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java new file mode 100644 index 0000000..df6833e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.utils; + +import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.Table; + + +/** + * Utility class that contains the default set of read metrics. + */ +public class TableMetrics { + + // Read metrics + public final Timer getNs; + public final Timer getAllNs; + public final Counter numGets; + public final Counter numGetAlls; + public final Counter numMissedLookups; + // Write metrics + public final Counter numPuts; + public final Timer putNs; + public final Counter numPutAlls; + public final Timer putAllNs; + public final Counter numDeletes; + public final Timer deleteNs; + public final Counter numDeleteAlls; + public final Timer deleteAllNs; + public final Counter numFlushes; + public final Timer flushNs; + + /** + * Constructor based on container and task container context + * + * @param context {@link Context} for this task + * @param table underlying table + * @param tableId table Id + */ + public TableMetrics(Context context, Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); + // Read metrics + numGets = tableMetricsUtil.newCounter("num-gets"); + getNs = tableMetricsUtil.newTimer("get-ns"); + numGetAlls = tableMetricsUtil.newCounter("num-getAlls"); + getAllNs = tableMetricsUtil.newTimer("getAll-ns"); + numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups"); + // Write metrics + numPuts = tableMetricsUtil.newCounter("num-puts"); + putNs = tableMetricsUtil.newTimer("put-ns"); + numPutAlls = tableMetricsUtil.newCounter("num-putAlls"); + putAllNs = tableMetricsUtil.newTimer("putAll-ns"); + numDeletes = tableMetricsUtil.newCounter("num-deletes"); + deleteNs = tableMetricsUtil.newTimer("delete-ns"); + numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls"); + deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); + numFlushes = tableMetricsUtil.newCounter("num-flushes"); + flushNs = tableMetricsUtil.newTimer("flush-ns"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java deleted file mode 100644 index e77fcfd..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.table.utils; - -import org.apache.samza.context.Context; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.Table; - - -/** - * Utility class that contains the default set of read metrics. - */ -public class TableReadMetrics { - - public final Timer getNs; - public final Timer getAllNs; - public final Counter numGets; - public final Counter numGetAlls; - public final Counter numMissedLookups; - - /** - * Constructor based on container and task container context - * - * @param context {@link Context} for this task - * @param table underlying table - * @param tableId table Id - */ - public TableReadMetrics(Context context, Table table, String tableId) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); - numGets = tableMetricsUtil.newCounter("num-gets"); - getNs = tableMetricsUtil.newTimer("get-ns"); - numGetAlls = tableMetricsUtil.newCounter("num-getAlls"); - getAllNs = tableMetricsUtil.newTimer("getAll-ns"); - numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups"); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java deleted file mode 100644 index bf65b74..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.table.utils; - -import org.apache.samza.context.Context; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.Table; - - -public class TableWriteMetrics { - - public final Counter numPuts; - public final Timer putNs; - public final Counter numPutAlls; - public final Timer putAllNs; - public final Counter numDeletes; - public final Timer deleteNs; - public final Counter numDeleteAlls; - public final Timer deleteAllNs; - public final Counter numFlushes; - public final Timer flushNs; - - /** - * Utility class that contains the default set of write metrics. - * - * @param context {@link Context} for this task - * @param table underlying table - * @param tableId table Id - */ - public TableWriteMetrics(Context context, Table table, String tableId) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); - numPuts = tableMetricsUtil.newCounter("num-puts"); - putNs = tableMetricsUtil.newTimer("put-ns"); - numPutAlls = tableMetricsUtil.newCounter("num-putAlls"); - putAllNs = tableMetricsUtil.newTimer("putAll-ns"); - numDeletes = tableMetricsUtil.newCounter("num-deletes"); - deleteNs = tableMetricsUtil.newTimer("delete-ns"); - numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls"); - deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); - numFlushes = tableMetricsUtil.newCounter("num-flushes"); - flushNs = tableMetricsUtil.newTimer("flush-ns"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java index 4112c8b..8fd161b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java @@ -26,7 +26,7 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; -import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; @@ -71,7 +71,7 @@ public class TestStreamTableJoinOperatorImpl { return record.getKey(); } }); - ReadableTable table = mock(ReadableTable.class); + ReadWriteTable table = mock(ReadWriteTable.class); when(table.get("1")).thenReturn("r1"); when(table.get("2")).thenReturn(null); Context context = new MockContext(); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java index a3b1963..e60b6ff 100644 --- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java +++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java @@ -49,12 +49,12 @@ public class TestTableManager { public static class DummyTableProviderFactory implements TableProviderFactory { - static ReadableTable table; + static ReadWriteTable table; static TableProvider tableProvider; @Override public TableProvider getTableProvider(String tableId) { - table = mock(ReadableTable.class); + table = mock(ReadWriteTable.class); tableProvider = mock(TableProvider.class); when(tableProvider.getTable()).thenReturn(table); return tableProvider; http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index 5a19767..c304bfd 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -35,11 +35,10 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor; import org.apache.samza.table.descriptors.TableDescriptor; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.ReadableTable; import org.apache.samza.table.caching.guava.GuavaCacheTable; import org.apache.samza.table.descriptors.CachingTableDescriptor; import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor; -import org.apache.samza.table.remote.RemoteReadWriteTable; +import org.apache.samza.table.remote.RemoteTable; import org.apache.samza.table.remote.TableRateLimiter; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; @@ -138,11 +137,11 @@ public class TestCachingTable { return Pair.of(cacheTable, cacheStore); } - private void initTables(ReadableTable ... tables) { + private void initTables(ReadWriteTable ... tables) { initTables(false, tables); } - private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) { + private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) { Map<String, String> config = new HashMap<>(); if (isTimerMetricsDisabled) { config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); @@ -242,7 +241,7 @@ public class TestCachingTable { @Test public void testNonexistentKeyInTable() { - ReadableTable<String, String> table = mock(ReadableTable.class); + ReadWriteTable<String, String> table = mock(ReadWriteTable.class); doReturn(CompletableFuture.completedFuture(null)).when(table).getAsync(any()); ReadWriteTable<String, String> cache = getMockCache().getLeft(); CachingTable<String, String> cachingTable = new CachingTable<>("myTable", table, cache, false); @@ -255,7 +254,7 @@ public class TestCachingTable { @Test public void testKeyEviction() { - ReadableTable<String, String> table = mock(ReadableTable.class); + ReadWriteTable<String, String> table = mock(ReadWriteTable.class); doReturn(CompletableFuture.completedFuture("3")).when(table).getAsync(any()); ReadWriteTable<String, String> cache = mock(ReadWriteTable.class); @@ -283,7 +282,7 @@ public class TestCachingTable { TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class); TableReadFunction<String, String> readFn = mock(TableReadFunction.class); TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); - final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>( + final RemoteTable<String, String> remoteTable = new RemoteTable<>( tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor()); @@ -402,7 +401,7 @@ public class TestCachingTable { doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any()); doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any()); - final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>( + final RemoteTable<String, String> remoteTable = new RemoteTable<>( tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor()); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java index 486295a..a764a8b 100644 --- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java @@ -30,7 +30,7 @@ import org.apache.samza.context.Context; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.table.Table; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableProviderFactory; import org.junit.Test; @@ -143,7 +143,7 @@ public class TestLocalTableDescriptor { } @Override - public Table getTable() { + public ReadWriteTable getTable() { throw new SamzaException("Not implemented"); } http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java deleted file mode 100644 index d7733a8..0000000 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.table.remote; - -import junit.framework.Assert; -import org.apache.samza.context.Context; -import org.apache.samza.context.MockContext; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Gauge; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.Timer; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.table.retry.RetriableReadFunction; -import org.apache.samza.table.retry.RetriableWriteFunction; -import org.apache.samza.table.retry.TableRetryPolicy; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - - -public class TestRemoteReadWriteTable { - private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); - - public static Context getMockContext() { - Context context = new MockContext(); - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); - doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); - doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry(); - return context; - } - - private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId, - TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) { - return getTable(tableId, readFn, writeFn, null); - } - - private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId, - TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) { - RemoteReadableTable<K, V> table; - - TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class); - TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class); - doReturn(true).when(readRateLimiter).isRateLimited(); - doReturn(true).when(writeRateLimiter).isRateLimited(); - - ExecutorService tableExecutor = Executors.newSingleThreadExecutor(); - - if (writeFn == null) { - table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor); - } else { - table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor); - } - - Context context = getMockContext(); - - table.init(context); - - return (T) table; - } - - private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception { - String tableId = "testGet-" + sync + error + retry; - TableReadFunction<String, String> readFn = mock(TableReadFunction.class); - // Sync is backed by async so needs to mock the async method - CompletableFuture<String> future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); - if (!retry) { - doReturn(future).when(readFn).getAsync(anyString()); - } else { - final int [] times = new int[] {0}; - doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar")) - .when(readFn).getAsync(anyString()); - } - } else { - future = CompletableFuture.completedFuture("bar"); - doReturn(future).when(readFn).getAsync(anyString()); - } - if (retry) { - doReturn(true).when(readFn).isRetriable(any()); - TableRetryPolicy policy = new TableRetryPolicy(); - readFn = new RetriableReadFunction<>(policy, readFn, schedExec); - } - RemoteReadableTable<String, String> table = getTable(tableId, readFn, null); - Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get()); - verify(table.readRateLimiter, times(1)).throttle(anyString()); - } - - @Test - public void testGet() throws Exception { - doTestGet(true, false, false); - } - - @Test - public void testGetAsync() throws Exception { - doTestGet(false, false, false); - } - - @Test(expected = ExecutionException.class) - public void testGetAsyncError() throws Exception { - doTestGet(false, true, false); - } - - @Test - public void testGetAsyncErrorRetried() throws Exception { - doTestGet(false, true, true); - } - - @Test - public void testGetMultipleTables() { - TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class); - TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class); - - // Sync is backed by async so needs to mock the async method - doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString()); - doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString()); - - RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null); - RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null); - - CompletableFuture<String> future1 = table1.getAsync("foo1"); - CompletableFuture<String> future2 = table2.getAsync("foo2"); - - CompletableFuture.allOf(future1, future2) - .thenAccept(u -> { - Assert.assertEquals(future1.join(), "bar1"); - Assert.assertEquals(future2.join(), "bar1"); - }); - } - - private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception { - String tableId = "testPut-" + sync + error + isDelete + retry; - TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class); - TableWriteFunction<String, String> writeFn = mockWriteFn; - CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null); - CompletableFuture<Void> failureFuture = new CompletableFuture(); - failureFuture.completeExceptionally(new RuntimeException("Test exception")); - if (!error) { - if (isDelete) { - doReturn(successFuture).when(writeFn).deleteAsync(any()); - } else { - doReturn(successFuture).when(writeFn).putAsync(any(), any()); - } - } else if (!retry) { - if (isDelete) { - doReturn(failureFuture).when(writeFn).deleteAsync(any()); - } else { - doReturn(failureFuture).when(writeFn).putAsync(any(), any()); - } - } else { - doReturn(true).when(writeFn).isRetriable(any()); - final int [] times = new int[] {0}; - if (isDelete) { - doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any()); - } else { - doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any()); - } - writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec); - } - RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn); - if (sync) { - table.put("foo", isDelete ? null : "bar"); - } else { - table.putAsync("foo", isDelete ? null : "bar").get(); - } - ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); - ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class); - if (isDelete) { - verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture()); - } else { - verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture()); - Assert.assertEquals("bar", valCaptor.getValue()); - } - Assert.assertEquals("foo", keyCaptor.getValue()); - if (isDelete) { - verify(table.writeRateLimiter, times(1)).throttle(anyString()); - } else { - verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString()); - } - } - - @Test - public void testPut() throws Exception { - doTestPut(true, false, false, false); - } - - @Test - public void testPutDelete() throws Exception { - doTestPut(true, false, true, false); - } - - @Test - public void testPutAsync() throws Exception { - doTestPut(false, false, false, false); - } - - @Test - public void testPutAsyncDelete() throws Exception { - doTestPut(false, false, true, false); - } - - @Test(expected = ExecutionException.class) - public void testPutAsyncError() throws Exception { - doTestPut(false, true, false, false); - } - - @Test - public void testPutAsyncErrorRetried() throws Exception { - doTestPut(false, true, false, true); - } - - private void doTestDelete(boolean sync, boolean error) throws Exception { - TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error, - mock(TableReadFunction.class), writeFn); - CompletableFuture<Void> future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); - } else { - future = CompletableFuture.completedFuture(null); - } - // Sync is backed by async so needs to mock the async method - doReturn(future).when(writeFn).deleteAsync(any()); - ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class); - if (sync) { - table.delete("foo"); - } else { - table.deleteAsync("foo").get(); - } - verify(writeFn, times(1)).deleteAsync(argCaptor.capture()); - Assert.assertEquals("foo", argCaptor.getValue()); - verify(table.writeRateLimiter, times(1)).throttle(anyString()); - } - - @Test - public void testDelete() throws Exception { - doTestDelete(true, false); - } - - @Test - public void testDeleteAsync() throws Exception { - doTestDelete(false, false); - } - - @Test(expected = ExecutionException.class) - public void testDeleteAsyncError() throws Exception { - doTestDelete(false, true); - } - - private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception { - TableReadFunction<String, String> readFn = mock(TableReadFunction.class); - Map<String, String> res = new HashMap<>(); - res.put("foo1", "bar1"); - if (!partial) { - res.put("foo2", "bar2"); - } - CompletableFuture<Map<String, String>> future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); - } else { - future = CompletableFuture.completedFuture(res); - } - // Sync is backed by async so needs to mock the async method - doReturn(future).when(readFn).getAllAsync(any()); - RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null); - Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2")) - : table.getAllAsync(Arrays.asList("foo1", "foo2")).get()); - verify(table.readRateLimiter, times(1)).throttle(anyCollection()); - } - - @Test - public void testGetAll() throws Exception { - doTestGetAll(true, false, false); - } - - @Test - public void testGetAllAsync() throws Exception { - doTestGetAll(false, false, false); - } - - @Test(expected = ExecutionException.class) - public void testGetAllAsyncError() throws Exception { - doTestGetAll(false, true, false); - } - - // Partial result is an acceptable scenario - @Test - public void testGetAllPartialResult() throws Exception { - doTestGetAll(false, false, true); - } - - public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception { - TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete, - mock(TableReadFunction.class), writeFn); - CompletableFuture<Void> future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); - } else { - future = CompletableFuture.completedFuture(null); - } - // Sync is backed by async so needs to mock the async method - doReturn(future).when(writeFn).putAllAsync(any()); - if (hasDelete) { - doReturn(future).when(writeFn).deleteAllAsync(any()); - } - List<Entry<String, String>> entries = Arrays.asList( - new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2")); - ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class); - if (sync) { - table.putAll(entries); - } else { - table.putAllAsync(entries).get(); - } - verify(writeFn, times(1)).putAllAsync(argCaptor.capture()); - if (hasDelete) { - ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class); - verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture()); - Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue()); - Assert.assertEquals(1, argCaptor.getValue().size()); - Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey()); - verify(table.writeRateLimiter, times(1)).throttle(anyCollection()); - } else { - Assert.assertEquals(entries, argCaptor.getValue()); - } - verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection()); - } - - @Test - public void testPutAll() throws Exception { - doTestPutAll(true, false, false); - } - - @Test - public void testPutAllHasDelete() throws Exception { - doTestPutAll(true, false, true); - } - - @Test - public void testPutAllAsync() throws Exception { - doTestPutAll(false, false, false); - } - - @Test - public void testPutAllAsyncHasDelete() throws Exception { - doTestPutAll(false, false, true); - } - - @Test(expected = ExecutionException.class) - public void testPutAllAsyncError() throws Exception { - doTestPutAll(false, true, false); - } - - public void doTestDeleteAll(boolean sync, boolean error) throws Exception { - TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error, - mock(TableReadFunction.class), writeFn); - CompletableFuture<Void> future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); - } else { - future = CompletableFuture.completedFuture(null); - } - // Sync is backed by async so needs to mock the async method - doReturn(future).when(writeFn).deleteAllAsync(any()); - List<String> keys = Arrays.asList("foo1", "foo2"); - ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class); - if (sync) { - table.deleteAll(keys); - } else { - table.deleteAllAsync(keys).get(); - } - verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture()); - Assert.assertEquals(keys, argCaptor.getValue()); - verify(table.writeRateLimiter, times(1)).throttle(anyCollection()); - } - - @Test - public void testDeleteAll() throws Exception { - doTestDeleteAll(true, false); - } - - @Test - public void testDeleteAllAsync() throws Exception { - doTestDeleteAll(false, false); - } - - @Test(expected = ExecutionException.class) - public void testDeleteAllAsyncError() throws Exception { - doTestDeleteAll(false, true); - } - - @Test - public void testFlush() { - TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn); - table.flush(); - verify(writeFn, times(1)).flush(); - } - - @Test - public void testGetWithCallbackExecutor() throws Exception { - TableReadFunction<String, String> readFn = mock(TableReadFunction.class); - // Sync is backed by async so needs to mock the async method - doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString()); - RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null, - Executors.newSingleThreadExecutor()); - Thread testThread = Thread.currentThread(); - - table.getAsync("foo").thenAccept(result -> { - Assert.assertEquals("bar", result); - // Must be executed on the executor thread - Assert.assertNotSame(testThread, Thread.currentThread()); - }); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java new file mode 100644 index 0000000..ae96d86 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.remote; + +import org.apache.samza.context.Context; +import org.apache.samza.context.MockContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; + +import org.junit.Assert; +import org.junit.Test; + +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + + +public class TestRemoteTable { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public static Context getMockContext() { + Context context = new MockContext(); + MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); + doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); + doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); + doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry(); + return context; + } + + private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, + TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) { + return getTable(tableId, readFn, writeFn, null); + } + + private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, + TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) { + RemoteTable<K, V> table; + + TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class); + TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class); + doReturn(true).when(readRateLimiter).isRateLimited(); + doReturn(true).when(writeRateLimiter).isRateLimited(); + + ExecutorService tableExecutor = Executors.newSingleThreadExecutor(); + + table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor); + + Context context = getMockContext(); + + table.init(context); + + return (T) table; + } + + private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception { + String tableId = "testGet-" + sync + error + retry; + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + // Sync is backed by async so needs to mock the async method + CompletableFuture<String> future; + if (error) { + future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("Test exception")); + if (!retry) { + doReturn(future).when(readFn).getAsync(anyString()); + } else { + final int [] times = new int[] {0}; + doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar")) + .when(readFn).getAsync(anyString()); + } + } else { + future = CompletableFuture.completedFuture("bar"); + doReturn(future).when(readFn).getAsync(anyString()); + } + if (retry) { + doReturn(true).when(readFn).isRetriable(any()); + TableRetryPolicy policy = new TableRetryPolicy(); + readFn = new RetriableReadFunction<>(policy, readFn, schedExec); + } + RemoteTable<String, String> table = getTable(tableId, readFn, null); + Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get()); + verify(table.readRateLimiter, times(1)).throttle(anyString()); + } + + @Test + public void testGet() throws Exception { + doTestGet(true, false, false); + } + + @Test + public void testGetAsync() throws Exception { + doTestGet(false, false, false); + } + + @Test(expected = ExecutionException.class) + public void testGetAsyncError() throws Exception { + doTestGet(false, true, false); + } + + @Test + public void testGetAsyncErrorRetried() throws Exception { + doTestGet(false, true, true); + } + + @Test + public void testGetMultipleTables() { + TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class); + TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class); + + // Sync is backed by async so needs to mock the async method + doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString()); + doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString()); + + RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null); + RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null); + + CompletableFuture<String> future1 = table1.getAsync("foo1"); + CompletableFuture<String> future2 = table2.getAsync("foo2"); + + CompletableFuture.allOf(future1, future2) + .thenAccept(u -> { + Assert.assertEquals(future1.join(), "bar1"); + Assert.assertEquals(future2.join(), "bar1"); + }); + } + + private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception { + String tableId = "testPut-" + sync + error + isDelete + retry; + TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class); + TableWriteFunction<String, String> writeFn = mockWriteFn; + CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null); + CompletableFuture<Void> failureFuture = new CompletableFuture(); + failureFuture.completeExceptionally(new RuntimeException("Test exception")); + if (!error) { + if (isDelete) { + doReturn(successFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(successFuture).when(writeFn).putAsync(any(), any()); + } + } else if (!retry) { + if (isDelete) { + doReturn(failureFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(failureFuture).when(writeFn).putAsync(any(), any()); + } + } else { + doReturn(true).when(writeFn).isRetriable(any()); + final int [] times = new int[] {0}; + if (isDelete) { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any()); + } else { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any()); + } + writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec); + } + RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn); + if (sync) { + table.put("foo", isDelete ? null : "bar"); + } else { + table.putAsync("foo", isDelete ? null : "bar").get(); + } + ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class); + if (isDelete) { + verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture()); + } else { + verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture()); + Assert.assertEquals("bar", valCaptor.getValue()); + } + Assert.assertEquals("foo", keyCaptor.getValue()); + if (isDelete) { + verify(table.writeRateLimiter, times(1)).throttle(anyString()); + } else { + verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString()); + } + } + + @Test + public void testPut() throws Exception { + doTestPut(true, false, false, false); + } + + @Test + public void testPutDelete() throws Exception { + doTestPut(true, false, true, false); + } + + @Test + public void testPutAsync() throws Exception { + doTestPut(false, false, false, false); + } + + @Test + public void testPutAsyncDelete() throws Exception { + doTestPut(false, false, true, false); + } + + @Test(expected = ExecutionException.class) + public void testPutAsyncError() throws Exception { + doTestPut(false, true, false, false); + } + + @Test + public void testPutAsyncErrorRetried() throws Exception { + doTestPut(false, true, false, true); + } + + private void doTestDelete(boolean sync, boolean error) throws Exception { + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + RemoteTable<String, String> table = getTable("testDelete-" + sync + error, + mock(TableReadFunction.class), writeFn); + CompletableFuture<Void> future; + if (error) { + future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("Test exception")); + } else { + future = CompletableFuture.completedFuture(null); + } + // Sync is backed by async so needs to mock the async method + doReturn(future).when(writeFn).deleteAsync(any()); + ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class); + if (sync) { + table.delete("foo"); + } else { + table.deleteAsync("foo").get(); + } + verify(writeFn, times(1)).deleteAsync(argCaptor.capture()); + Assert.assertEquals("foo", argCaptor.getValue()); + verify(table.writeRateLimiter, times(1)).throttle(anyString()); + } + + @Test + public void testDelete() throws Exception { + doTestDelete(true, false); + } + + @Test + public void testDeleteAsync() throws Exception { + doTestDelete(false, false); + } + + @Test(expected = ExecutionException.class) + public void testDeleteAsyncError() throws Exception { + doTestDelete(false, true); + } + + private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception { + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + Map<String, String> res = new HashMap<>(); + res.put("foo1", "bar1"); + if (!partial) { + res.put("foo2", "bar2"); + } + CompletableFuture<Map<String, String>> future; + if (error) { + future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("Test exception")); + } else { + future = CompletableFuture.completedFuture(res); + } + // Sync is backed by async so needs to mock the async method + doReturn(future).when(readFn).getAllAsync(any()); + RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null); + Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2")) + : table.getAllAsync(Arrays.asList("foo1", "foo2")).get()); + verify(table.readRateLimiter, times(1)).throttle(anyCollection()); + } + + @Test + public void testGetAll() throws Exception { + doTestGetAll(true, false, false); + } + + @Test + public void testGetAllAsync() throws Exception { + doTestGetAll(false, false, false); + } + + @Test(expected = ExecutionException.class) + public void testGetAllAsyncError() throws Exception { + doTestGetAll(false, true, false); + } + + // Partial result is an acceptable scenario + @Test + public void testGetAllPartialResult() throws Exception { + doTestGetAll(false, false, true); + } + + public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception { + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete, + mock(TableReadFunction.class), writeFn); + CompletableFuture<Void> future; + if (error) { + future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("Test exception")); + } else { + future = CompletableFuture.completedFuture(null); + } + // Sync is backed by async so needs to mock the async method + doReturn(future).when(writeFn).putAllAsync(any()); + if (hasDelete) { + doReturn(future).when(writeFn).deleteAllAsync(any()); + } + List<Entry<String, String>> entries = Arrays.asList( + new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2")); + ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class); + if (sync) { + table.putAll(entries); + } else { + table.putAllAsync(entries).get(); + } + verify(writeFn, times(1)).putAllAsync(argCaptor.capture()); + if (hasDelete) { + ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class); + verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture()); + Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue()); + Assert.assertEquals(1, argCaptor.getValue().size()); + Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey()); + verify(table.writeRateLimiter, times(1)).throttle(anyCollection()); + } else { + Assert.assertEquals(entries, argCaptor.getValue()); + } + verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection()); + } + + @Test + public void testPutAll() throws Exception { + doTestPutAll(true, false, false); + } + + @Test + public void testPutAllHasDelete() throws Exception { + doTestPutAll(true, false, true); + } + + @Test + public void testPutAllAsync() throws Exception { + doTestPutAll(false, false, false); + } + + @Test + public void testPutAllAsyncHasDelete() throws Exception { + doTestPutAll(false, false, true); + } + + @Test(expected = ExecutionException.class) + public void testPutAllAsyncError() throws Exception { + doTestPutAll(false, true, false); + } + + public void doTestDeleteAll(boolean sync, boolean error) throws Exception { + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error, + mock(TableReadFunction.class), writeFn); + CompletableFuture<Void> future; + if (error) { + future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("Test exception")); + } else { + future = CompletableFuture.completedFuture(null); + } + // Sync is backed by async so needs to mock the async method + doReturn(future).when(writeFn).deleteAllAsync(any()); + List<String> keys = Arrays.asList("foo1", "foo2"); + ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class); + if (sync) { + table.deleteAll(keys); + } else { + table.deleteAllAsync(keys).get(); + } + verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture()); + Assert.assertEquals(keys, argCaptor.getValue()); + verify(table.writeRateLimiter, times(1)).throttle(anyCollection()); + } + + @Test + public void testDeleteAll() throws Exception { + doTestDeleteAll(true, false); + } + + @Test + public void testDeleteAllAsync() throws Exception { + doTestDeleteAll(false, false); + } + + @Test(expected = ExecutionException.class) + public void testDeleteAllAsyncError() throws Exception { + doTestDeleteAll(false, true); + } + + @Test + public void testFlush() { + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn); + table.flush(); + verify(writeFn, times(1)).flush(); + } + + @Test + public void testGetWithCallbackExecutor() throws Exception { + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + // Sync is backed by async so needs to mock the async method + doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString()); + RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null, + Executors.newSingleThreadExecutor()); + Thread testThread = Thread.currentThread(); + + table.getAsync("foo").thenAccept(result -> { + Assert.assertEquals("bar", result); + // Must be executed on the executor thread + Assert.assertNotSame(testThread, Thread.currentThread()); + }); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java index 3d8e36f..907242f 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java @@ -36,17 +36,17 @@ import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; import org.apache.samza.table.descriptors.RemoteTableDescriptor; import org.apache.samza.table.descriptors.TableDescriptor; -import org.apache.samza.table.remote.RemoteReadWriteTable; +import org.apache.samza.table.remote.RemoteTable; import org.apache.samza.table.remote.RemoteTableProvider; import org.apache.samza.table.remote.TableRateLimiter; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; - import org.apache.samza.table.retry.RetriableReadFunction; import org.apache.samza.table.retry.RetriableWriteFunction; import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; + import org.junit.Assert; import org.junit.Test; @@ -207,8 +207,8 @@ public class TestRemoteTableDescriptor { RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId()); provider.init(createMockContext(desc)); Table table = provider.getTable(); - Assert.assertTrue(table instanceof RemoteReadWriteTable); - RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table; + Assert.assertTrue(table instanceof RemoteTable); + RemoteTable rwTable = (RemoteTable) table; if (numRateLimitOps > 0) { Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null); Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java index 5116cab..050ea55 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.Table; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; -import org.apache.samza.table.remote.TestRemoteReadWriteTable; +import org.apache.samza.table.remote.TestRemoteTable; import org.apache.samza.table.utils.TableMetricsUtil; import org.junit.Test; @@ -54,7 +54,7 @@ public class TestRetriableTableFunctions { public TableMetricsUtil getMetricsUtil(String tableId) { Table table = mock(Table.class); - Context context = TestRemoteReadWriteTable.getMockContext(); + Context context = TestRemoteTable.getMockContext(); return new TableMetricsUtil(context, table, tableId); } http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java index ddb79ba..fc9ce76 100644 --- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java @@ -19,7 +19,7 @@ package org.apache.samza.storage.kv.inmemory; import java.util.Map; -import junit.framework.Assert; + import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; @@ -28,7 +28,9 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.storage.kv.LocalTableProviderFactory; import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor; + import org.junit.Test; +import org.junit.Assert; public class TestInMemoryTableDescriptor { http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java index 62fb3da..319fb0f 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java @@ -19,7 +19,7 @@ package org.apache.samza.storage.kv.descriptors; import java.util.Map; -import junit.framework.Assert; + import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; @@ -30,7 +30,9 @@ import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.LocalTableProviderFactory; import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory; + import org.junit.Test; +import org.junit.Assert; public class TestRocksDbTableDescriptor { http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java deleted file mode 100644 index eae6bb0..0000000 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.storage.kv; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.ReadWriteTable; - -import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; -import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; - - -/** - * A store backed readable and writable table - * - * @param <K> the type of the key in this table - * @param <V> the type of the value in this table - */ -public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> - implements ReadWriteTable<K, V> { - - /** - * Constructs an instance of {@link LocalReadWriteTable} - * @param tableId the table Id - * @param kvStore the backing store - */ - public LocalReadWriteTable(String tableId, KeyValueStore kvStore) { - super(tableId, kvStore); - } - - @Override - public void put(K key, V value) { - if (value != null) { - instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value)); - } else { - delete(key); - } - } - - @Override - public CompletableFuture<Void> putAsync(K key, V value) { - CompletableFuture<Void> future = new CompletableFuture(); - try { - put(key, value); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; - } - - @Override - public void putAll(List<Entry<K, V>> entries) { - List<Entry<K, V>> toPut = new LinkedList<>(); - List<K> toDelete = new LinkedList<>(); - entries.forEach(e -> { - if (e.getValue() != null) { - toPut.add(e); - } else { - toDelete.add(e.getKey()); - } - }); - - if (!toPut.isEmpty()) { - instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut)); - } - - if (!toDelete.isEmpty()) { - deleteAll(toDelete); - } - } - - @Override - public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) { - CompletableFuture<Void> future = new CompletableFuture(); - try { - putAll(entries); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; - } - - @Override - public void delete(K key) { - instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key)); - } - - @Override - public CompletableFuture<Void> deleteAsync(K key) { - CompletableFuture<Void> future = new CompletableFuture(); - try { - delete(key); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; - } - - @Override - public void deleteAll(List<K> keys) { - instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys)); - } - - @Override - public CompletableFuture<Void> deleteAllAsync(List<K> keys) { - CompletableFuture<Void> future = new CompletableFuture(); - try { - deleteAll(keys); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; - } - - @Override - public void flush() { - instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush()); - } - - private interface Func0 { - void apply(); - } - - private void instrument(Counter counter, Timer timer, Func0 func) { - incCounter(counter); - long startNs = clock.nanoTime(); - func.apply(); - updateTimer(timer, clock.nanoTime() - startNs); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java deleted file mode 100644 index 29ddb15..0000000 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.storage.kv; - -import com.google.common.base.Preconditions; - -import com.google.common.base.Supplier; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; - -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.BaseReadableTable; - -import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; -import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; - -/** - * A store backed readable table - * - * @param <K> the type of the key in this table - * @param <V> the type of the value in this table - */ -public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> { - - protected final KeyValueStore<K, V> kvStore; - - /** - * Constructs an instance of {@link LocalReadableTable} - * @param tableId the table Id - * @param kvStore the backing store - */ - public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) { - super(tableId); - Preconditions.checkNotNull(kvStore, "null KeyValueStore"); - this.kvStore = kvStore; - } - - @Override - public V get(K key) { - V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key)); - if (result == null) { - incCounter(readMetrics.numMissedLookups); - } - return result; - } - - @Override - public CompletableFuture<V> getAsync(K key) { - CompletableFuture<V> future = new CompletableFuture(); - try { - future.complete(get(key)); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; - } - - @Override - public Map<K, V> getAll(List<K> keys) { - Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys)); - result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups)); - return result; - } - - @Override - public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) { - CompletableFuture<Map<K, V>> future = new CompletableFuture(); - try { - future.complete(getAll(keys)); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; - } - - @Override - public void close() { - // The KV store is not closed here as it may still be needed by downstream operators, - // it will be closed by the SamzaContainer - } - - private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) { - incCounter(counter); - long startNs = clock.nanoTime(); - T result = func.get(); - updateTimer(timer, clock.nanoTime() - startNs); - return result; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java new file mode 100644 index 0000000..d9767b6 --- /dev/null +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.BaseReadWriteTable; + +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; + + +/** + * A store backed readable and writable table + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +public class LocalTable<K, V> extends BaseReadWriteTable<K, V> { + + protected final KeyValueStore<K, V> kvStore; + + /** + * Constructs an instance of {@link LocalTable} + * @param tableId the table Id + * @param kvStore the backing store + */ + public LocalTable(String tableId, KeyValueStore kvStore) { + super(tableId); + Preconditions.checkNotNull(kvStore, "null KeyValueStore"); + this.kvStore = kvStore; + } + + @Override + public V get(K key) { + V result = instrument(metrics.numGets, metrics.getNs, () -> kvStore.get(key)); + if (result == null) { + incCounter(metrics.numMissedLookups); + } + return result; + } + + @Override + public CompletableFuture<V> getAsync(K key) { + CompletableFuture<V> future = new CompletableFuture(); + try { + future.complete(get(key)); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public Map<K, V> getAll(List<K> keys) { + Map<K, V> result = instrument(metrics.numGetAlls, metrics.getAllNs, () -> kvStore.getAll(keys)); + result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups)); + return result; + } + + @Override + public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) { + CompletableFuture<Map<K, V>> future = new CompletableFuture(); + try { + future.complete(getAll(keys)); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public void put(K key, V value) { + if (value != null) { + instrument(metrics.numPuts, metrics.putNs, () -> kvStore.put(key, value)); + } else { + delete(key); + } + } + + @Override + public CompletableFuture<Void> putAsync(K key, V value) { + CompletableFuture<Void> future = new CompletableFuture(); + try { + put(key, value); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + List<Entry<K, V>> toPut = new LinkedList<>(); + List<K> toDelete = new LinkedList<>(); + entries.forEach(e -> { + if (e.getValue() != null) { + toPut.add(e); + } else { + toDelete.add(e.getKey()); + } + }); + + if (!toPut.isEmpty()) { + instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut)); + } + + if (!toDelete.isEmpty()) { + deleteAll(toDelete); + } + } + + @Override + public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) { + CompletableFuture<Void> future = new CompletableFuture(); + try { + putAll(entries); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public void delete(K key) { + instrument(metrics.numDeletes, metrics.deleteNs, () -> kvStore.delete(key)); + } + + @Override + public CompletableFuture<Void> deleteAsync(K key) { + CompletableFuture<Void> future = new CompletableFuture(); + try { + delete(key); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public void deleteAll(List<K> keys) { + instrument(metrics.numDeleteAlls, metrics.deleteAllNs, () -> kvStore.deleteAll(keys)); + } + + @Override + public CompletableFuture<Void> deleteAllAsync(List<K> keys) { + CompletableFuture<Void> future = new CompletableFuture(); + try { + deleteAll(keys); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public void flush() { + instrument(metrics.numFlushes, metrics.flushNs, () -> kvStore.flush()); + } + + @Override + public void close() { + // The KV store is not closed here as it may still be needed by downstream operators, + // it will be closed by the SamzaContainer + } + + private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) { + incCounter(counter); + long startNs = clock.nanoTime(); + T result = func.get(); + updateTimer(timer, clock.nanoTime() - startNs); + return result; + } + + private interface Func0 { + void apply(); + } + + private void instrument(Counter counter, Timer timer, Func0 func) { + incCounter(counter); + long startNs = clock.nanoTime(); + func.apply(); + updateTimer(timer, clock.nanoTime() - startNs); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java index 3be61d0..5099a7e 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java @@ -20,8 +20,7 @@ package org.apache.samza.storage.kv; import com.google.common.base.Preconditions; import org.apache.samza.context.Context; -import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.Table; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.BaseTableProvider; /** @@ -53,10 +52,10 @@ public class LocalTableProvider extends BaseTableProvider { } @Override - public Table getTable() { + public ReadWriteTable getTable() { Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId)); Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId); - ReadableTable table = new LocalReadWriteTable(tableId, kvStore); + ReadWriteTable table = new LocalTable(tableId, kvStore); table.init(this.context); return table; }
