http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java deleted file mode 100644 index 044fab4..0000000 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.storage.kv; - -import java.util.Arrays; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.MetricsConfig; -import org.apache.samza.context.ContainerContext; -import org.apache.samza.context.Context; -import org.apache.samza.context.JobContext; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.ReadWriteTable; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import static org.mockito.Mockito.*; - - -public class TestLocalReadWriteTable { - - public static final String TABLE_ID = "t1"; - - private Timer putNs; - private Timer putAllNs; - private Timer deleteNs; - private Timer deleteAllNs; - private Timer flushNs; - private Counter numPuts; - private Counter numPutAlls; - private Counter numDeletes; - private Counter numDeleteAlls; - private Counter numFlushes; - private Timer putCallbackNs; - private Timer deleteCallbackNs; - - private MetricsRegistry metricsRegistry; - - private KeyValueStore kvStore; - - @Before - public void setUp() { - - putNs = new Timer(""); - putAllNs = new Timer(""); - deleteNs = new Timer(""); - deleteAllNs = new Timer(""); - flushNs = new Timer(""); - numPuts = new Counter(""); - numPutAlls = new Counter(""); - numDeletes = new Counter(""); - numDeleteAlls = new Counter(""); - numFlushes = new Counter(""); - putCallbackNs = new Timer(""); - deleteCallbackNs = new Timer(""); - - metricsRegistry = mock(MetricsRegistry.class); - String groupName = LocalReadWriteTable.class.getSimpleName(); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs); - - kvStore = mock(KeyValueStore.class); - } - - @Test - public void testPut() throws Exception { - ReadWriteTable table = createTable(false); - table.put("k1", "v1"); - table.putAsync("k2", "v2").get(); - table.putAsync("k3", null).get(); - verify(kvStore, times(2)).put(any(), any()); - verify(kvStore, times(1)).delete(any()); - Assert.assertEquals(2, numPuts.getCount()); - Assert.assertEquals(1, numDeletes.getCount()); - Assert.assertTrue(putNs.getSnapshot().getAverage() > 0); - Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, numPutAlls.getCount()); - Assert.assertEquals(0, numDeleteAlls.getCount()); - Assert.assertEquals(0, numFlushes.getCount()); - Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testPutAll() throws Exception { - ReadWriteTable table = createTable(false); - List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null)); - table.putAll(entries); - table.putAllAsync(entries).get(); - verify(kvStore, times(2)).putAll(any()); - verify(kvStore, times(2)).deleteAll(any()); - Assert.assertEquals(2, numPutAlls.getCount()); - Assert.assertEquals(2, numDeleteAlls.getCount()); - Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0); - Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, numPuts.getCount()); - Assert.assertEquals(0, numDeletes.getCount()); - Assert.assertEquals(0, numFlushes.getCount()); - Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testDelete() throws Exception { - ReadWriteTable table = createTable(false); - table.delete(""); - table.deleteAsync("").get(); - verify(kvStore, times(2)).delete(any()); - Assert.assertEquals(2, numDeletes.getCount()); - Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, numPuts.getCount()); - Assert.assertEquals(0, numPutAlls.getCount()); - Assert.assertEquals(0, numDeleteAlls.getCount()); - Assert.assertEquals(0, numFlushes.getCount()); - Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testDeleteAll() throws Exception { - ReadWriteTable table = createTable(false); - table.deleteAll(Collections.emptyList()); - table.deleteAllAsync(Collections.emptyList()).get(); - verify(kvStore, times(2)).deleteAll(any()); - Assert.assertEquals(2, numDeleteAlls.getCount()); - Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, numPuts.getCount()); - Assert.assertEquals(0, numPutAlls.getCount()); - Assert.assertEquals(0, numDeletes.getCount()); - Assert.assertEquals(0, numFlushes.getCount()); - Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testFlush() { - ReadWriteTable table = createTable(false); - table.flush(); - table.flush(); - verify(kvStore, times(2)).flush(); - Assert.assertEquals(2, numFlushes.getCount()); - Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, numPuts.getCount()); - Assert.assertEquals(0, numPutAlls.getCount()); - Assert.assertEquals(0, numDeletes.getCount()); - Assert.assertEquals(0, numDeleteAlls.getCount()); - Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testTimerDisabled() throws Exception { - ReadWriteTable table = createTable(true); - table.put("", ""); - table.putAsync("", "").get(); - table.putAll(Collections.emptyList()); - table.putAllAsync(Collections.emptyList()).get(); - table.delete(""); - table.deleteAsync("").get(); - table.deleteAll(Collections.emptyList()); - table.deleteAllAsync(Collections.emptyList()).get(); - table.flush(); - Assert.assertEquals(1, numFlushes.getCount()); - Assert.assertEquals(2, numPuts.getCount()); - Assert.assertEquals(0, numPutAlls.getCount()); - Assert.assertEquals(2, numDeletes.getCount()); - Assert.assertEquals(2, numDeleteAlls.getCount()); - Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); - } - - private LocalReadWriteTable createTable(boolean isTimerDisabled) { - Map<String, String> config = new HashMap<>(); - if (isTimerDisabled) { - config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); - } - Context context = mock(Context.class); - JobContext jobContext = mock(JobContext.class); - when(context.getJobContext()).thenReturn(jobContext); - when(jobContext.getConfig()).thenReturn(new MapConfig(config)); - ContainerContext containerContext = mock(ContainerContext.class); - when(context.getContainerContext()).thenReturn(containerContext); - when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); - - LocalReadWriteTable table = new LocalReadWriteTable("t1", kvStore); - table.init(context); - - return table; - } -}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java deleted file mode 100644 index e1c82d9..0000000 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.storage.kv; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.MetricsConfig; -import org.apache.samza.context.ContainerContext; -import org.apache.samza.context.Context; -import org.apache.samza.context.JobContext; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.ReadableTable; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import static org.mockito.Mockito.*; - -public class TestLocalReadableTable { - - public static final String TABLE_ID = "t1"; - - private List<String> keys; - private Map<String, String> values; - - private Timer getNs; - private Timer getAllNs; - private Counter numGets; - private Counter numGetAlls; - private Timer getCallbackNs; - private Counter numMissedLookups; - - private MetricsRegistry metricsRegistry; - - private KeyValueStore kvStore; - - @Before - public void setUp() { - keys = Arrays.asList("k1", "k2", "k3"); - - values = new HashMap<>(); - values.put("k1", "v1"); - values.put("k2", "v2"); - values.put("k3", null); - - kvStore = mock(KeyValueStore.class); - when(kvStore.get("k1")).thenReturn("v1"); - when(kvStore.get("k2")).thenReturn("v2"); - when(kvStore.getAll(keys)).thenReturn(values); - - getNs = new Timer(""); - getAllNs = new Timer(""); - numGets = new Counter(""); - numGetAlls = new Counter(""); - getCallbackNs = new Timer(""); - numMissedLookups = new Counter(""); - - metricsRegistry = mock(MetricsRegistry.class); - String groupName = LocalReadableTable.class.getSimpleName(); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls); - when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs); - when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs); - } - - @Test - public void testGet() throws Exception { - ReadableTable table = createTable(false); - Assert.assertEquals("v1", table.get("k1")); - Assert.assertEquals("v2", table.getAsync("k2").get()); - Assert.assertNull(table.get("k3")); - verify(kvStore, times(3)).get(any()); - Assert.assertEquals(3, numGets.getCount()); - Assert.assertEquals(1, numMissedLookups.getCount()); - Assert.assertTrue(getNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, numGetAlls.getCount()); - Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testGetAll() throws Exception { - ReadableTable table = createTable(false); - Assert.assertEquals(values, table.getAll(keys)); - Assert.assertEquals(values, table.getAllAsync(keys).get()); - verify(kvStore, times(2)).getAll(any()); - Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList())); - Assert.assertEquals(2, numMissedLookups.getCount()); - Assert.assertEquals(3, numGetAlls.getCount()); - Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0); - Assert.assertEquals(0, numGets.getCount()); - Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); - } - - @Test - public void testTimerDisabled() throws Exception { - ReadableTable table = createTable(true); - table.get(""); - table.getAsync("").get(); - table.getAll(keys); - table.getAllAsync(keys).get(); - Assert.assertEquals(2, numGets.getCount()); - Assert.assertEquals(4, numMissedLookups.getCount()); - Assert.assertEquals(2, numGetAlls.getCount()); - Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); - } - - private LocalReadableTable createTable(boolean isTimerDisabled) { - Map<String, String> config = new HashMap<>(); - if (isTimerDisabled) { - config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); - } - Context context = mock(Context.class); - JobContext jobContext = mock(JobContext.class); - when(context.getJobContext()).thenReturn(jobContext); - when(jobContext.getConfig()).thenReturn(new MapConfig(config)); - ContainerContext containerContext = mock(ContainerContext.class); - when(context.getContainerContext()).thenReturn(containerContext); - when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); - - LocalReadableTable table = new LocalReadableTable("t1", kvStore); - table.init(context); - - return table; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java index 5367931..263ab56 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java @@ -19,7 +19,6 @@ package org.apache.samza.storage.kv; -import junit.framework.Assert; import org.apache.samza.config.MapConfig; import org.apache.samza.context.ContainerContext; import org.apache.samza.context.Context; @@ -28,6 +27,7 @@ import org.apache.samza.context.TaskContext; import org.apache.samza.table.TableProvider; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Test; +import org.junit.Assert; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java new file mode 100644 index 0000000..0fd4539 --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.JobContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; + +import org.apache.samza.table.ReadWriteTable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class TestLocalTableRead { + + public static final String TABLE_ID = "t1"; + + private List<String> keys; + private Map<String, String> values; + + private Timer getNs; + private Timer getAllNs; + private Counter numGets; + private Counter numGetAlls; + private Timer getCallbackNs; + private Counter numMissedLookups; + + private MetricsRegistry metricsRegistry; + + private KeyValueStore kvStore; + + @Before + public void setUp() { + keys = Arrays.asList("k1", "k2", "k3"); + + values = new HashMap<>(); + values.put("k1", "v1"); + values.put("k2", "v2"); + values.put("k3", null); + + kvStore = mock(KeyValueStore.class); + when(kvStore.get("k1")).thenReturn("v1"); + when(kvStore.get("k2")).thenReturn("v2"); + when(kvStore.getAll(keys)).thenReturn(values); + + getNs = new Timer(""); + getAllNs = new Timer(""); + numGets = new Counter(""); + numGetAlls = new Counter(""); + getCallbackNs = new Timer(""); + numMissedLookups = new Counter(""); + + metricsRegistry = mock(MetricsRegistry.class); + String groupName = LocalTable.class.getSimpleName(); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs); + } + + @Test + public void testGet() throws Exception { + ReadWriteTable table = createTable(false); + Assert.assertEquals("v1", table.get("k1")); + Assert.assertEquals("v2", table.getAsync("k2").get()); + Assert.assertNull(table.get("k3")); + verify(kvStore, times(3)).get(any()); + Assert.assertEquals(3, numGets.getCount()); + Assert.assertEquals(1, numMissedLookups.getCount()); + Assert.assertTrue(getNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, numGetAlls.getCount()); + Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testGetAll() throws Exception { + ReadWriteTable table = createTable(false); + Assert.assertEquals(values, table.getAll(keys)); + Assert.assertEquals(values, table.getAllAsync(keys).get()); + verify(kvStore, times(2)).getAll(any()); + Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList())); + Assert.assertEquals(2, numMissedLookups.getCount()); + Assert.assertEquals(3, numGetAlls.getCount()); + Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, numGets.getCount()); + Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testTimerDisabled() throws Exception { + ReadWriteTable table = createTable(true); + table.get(""); + table.getAsync("").get(); + table.getAll(keys); + table.getAllAsync(keys).get(); + Assert.assertEquals(2, numGets.getCount()); + Assert.assertEquals(4, numMissedLookups.getCount()); + Assert.assertEquals(2, numGetAlls.getCount()); + Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); + } + + private LocalTable createTable(boolean isTimerDisabled) { + Map<String, String> config = new HashMap<>(); + if (isTimerDisabled) { + config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + } + Context context = mock(Context.class); + JobContext jobContext = mock(JobContext.class); + when(context.getJobContext()).thenReturn(jobContext); + when(jobContext.getConfig()).thenReturn(new MapConfig(config)); + ContainerContext containerContext = mock(ContainerContext.class); + when(context.getContainerContext()).thenReturn(containerContext); + when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); + + LocalTable table = new LocalTable("t1", kvStore); + table.init(context); + + return table; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java new file mode 100644 index 0000000..80eb99f --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.Arrays; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.JobContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.ReadWriteTable; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestLocalTableWrite { + + public static final String TABLE_ID = "t1"; + + private Timer putNs; + private Timer putAllNs; + private Timer deleteNs; + private Timer deleteAllNs; + private Timer flushNs; + private Counter numPuts; + private Counter numPutAlls; + private Counter numDeletes; + private Counter numDeleteAlls; + private Counter numFlushes; + private Timer putCallbackNs; + private Timer deleteCallbackNs; + + private MetricsRegistry metricsRegistry; + + private KeyValueStore kvStore; + + @Before + public void setUp() { + + putNs = new Timer(""); + putAllNs = new Timer(""); + deleteNs = new Timer(""); + deleteAllNs = new Timer(""); + flushNs = new Timer(""); + numPuts = new Counter(""); + numPutAlls = new Counter(""); + numDeletes = new Counter(""); + numDeleteAlls = new Counter(""); + numFlushes = new Counter(""); + putCallbackNs = new Timer(""); + deleteCallbackNs = new Timer(""); + + metricsRegistry = mock(MetricsRegistry.class); + String groupName = LocalTable.class.getSimpleName(); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs); + + kvStore = mock(KeyValueStore.class); + } + + @Test + public void testPut() throws Exception { + ReadWriteTable table = createTable(false); + table.put("k1", "v1"); + table.putAsync("k2", "v2").get(); + table.putAsync("k3", null).get(); + verify(kvStore, times(2)).put(any(), any()); + verify(kvStore, times(1)).delete(any()); + Assert.assertEquals(2, numPuts.getCount()); + Assert.assertEquals(1, numDeletes.getCount()); + Assert.assertTrue(putNs.getSnapshot().getAverage() > 0); + Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testPutAll() throws Exception { + ReadWriteTable table = createTable(false); + List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null)); + table.putAll(entries); + table.putAllAsync(entries).get(); + verify(kvStore, times(2)).putAll(any()); + verify(kvStore, times(2)).deleteAll(any()); + Assert.assertEquals(2, numPutAlls.getCount()); + Assert.assertEquals(2, numDeleteAlls.getCount()); + Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0); + Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testDelete() throws Exception { + ReadWriteTable table = createTable(false); + table.delete(""); + table.deleteAsync("").get(); + verify(kvStore, times(2)).delete(any()); + Assert.assertEquals(2, numDeletes.getCount()); + Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testDeleteAll() throws Exception { + ReadWriteTable table = createTable(false); + table.deleteAll(Collections.emptyList()); + table.deleteAllAsync(Collections.emptyList()).get(); + verify(kvStore, times(2)).deleteAll(any()); + Assert.assertEquals(2, numDeleteAlls.getCount()); + Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testFlush() { + ReadWriteTable table = createTable(false); + table.flush(); + table.flush(); + verify(kvStore, times(2)).flush(); + Assert.assertEquals(2, numFlushes.getCount()); + Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testTimerDisabled() throws Exception { + ReadWriteTable table = createTable(true); + table.put("", ""); + table.putAsync("", "").get(); + table.putAll(Collections.emptyList()); + table.putAllAsync(Collections.emptyList()).get(); + table.delete(""); + table.deleteAsync("").get(); + table.deleteAll(Collections.emptyList()); + table.deleteAllAsync(Collections.emptyList()).get(); + table.flush(); + Assert.assertEquals(1, numFlushes.getCount()); + Assert.assertEquals(2, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(2, numDeletes.getCount()); + Assert.assertEquals(2, numDeleteAlls.getCount()); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + private LocalTable createTable(boolean isTimerDisabled) { + Map<String, String> config = new HashMap<>(); + if (isTimerDisabled) { + config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + } + Context context = mock(Context.class); + JobContext jobContext = mock(JobContext.class); + when(context.getJobContext()).thenReturn(jobContext); + when(jobContext.getConfig()).thenReturn(new MapConfig(config)); + ContainerContext containerContext = mock(ContainerContext.class); + when(context.getContainerContext()).thenReturn(containerContext); + when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); + + LocalTable table = new LocalTable("t1", kvStore); + table.init(context); + + return table; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index 2137a46..ab650f2 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -229,7 +229,7 @@ public class StreamTaskIntegrationTest { @Override public void init(Context context) throws Exception { - profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store"); + profileViewTable = context.getTaskContext().getTable("profile-view-store"); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java deleted file mode 100644 index b447493..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.table; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.TaskApplication; -import org.apache.samza.application.descriptors.StreamApplicationDescriptor; -import org.apache.samza.application.descriptors.TaskApplicationDescriptor; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.JobCoordinatorConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.TaskConfig; -import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; -import org.apache.samza.context.Context; -import org.apache.samza.system.descriptors.GenericInputDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.system.descriptors.DelegatingSystemDescriptor; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; -import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.Table; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.StreamTaskFactory; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.test.harness.AbstractIntegrationTestHarness; -import org.apache.samza.test.util.ArraySystemFactory; -import org.apache.samza.test.util.Base64Serializer; - -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.samza.test.table.TestTableData.EnrichedPageView; -import static org.apache.samza.test.table.TestTableData.PageView; -import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde; -import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory; -import static org.apache.samza.test.table.TestTableData.Profile; -import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - - -/** - * This test class tests sendTo() and join() for local tables - */ -public class TestLocalTable extends AbstractIntegrationTestHarness { - - @Test - public void testSendTo() throws Exception { - - int count = 10; - Profile[] profiles = TestTableData.generateProfiles(count); - - int partitionCount = 4; - Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); - - configs.put("streams.Profile.samza.system", "test"); - configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); - configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); - - MyMapFunction mapFn = new MyMapFunction(); - - final StreamApplication app = appDesc -> { - - Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1", - KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - - appDesc.getInputStream(isd) - .map(mapFn) - .sendTo(table); - }; - - Config config = new MapConfig(configs); - final LocalApplicationRunner runner = new LocalApplicationRunner(app, config); - executeRun(runner, config); - runner.waitForFinish(); - - for (int i = 0; i < partitionCount; i++) { - MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i)); - assertEquals(count, mapFnCopy.received.size()); - mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null)); - } - } - - static class StreamTableJoinApp implements StreamApplication { - static List<PageView> received = new LinkedList<>(); - static List<EnrichedPageView> joined = new LinkedList<>(); - - @Override - public void describe(StreamApplicationDescriptor appDesc) { - Table<KV<Integer, Profile>> table = appDesc.getTable( - new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - appDesc.getInputStream(profileISD) - .map(m -> new KV(m.getMemberId(), m)) - .sendTo(table); - - GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); - appDesc.getInputStream(pageViewISD) - .map(pv -> { - received.add(pv); - return pv; - }) - .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1") - .join(table, new PageViewToProfileJoinFunction()) - .sink((m, collector, coordinator) -> joined.add(m)); - } - } - - @Test - public void testStreamTableJoin() throws Exception { - - int count = 10; - PageView[] pageViews = TestTableData.generatePageViews(count); - Profile[] profiles = TestTableData.generateProfiles(count); - - int partitionCount = 4; - Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); - - configs.put("streams.PageView.samza.system", "test"); - configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews)); - configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); - - configs.put("streams.Profile.samza.system", "test"); - configs.put("streams.Profile.samza.bootstrap", "true"); - configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); - configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); - - Config config = new MapConfig(configs); - final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config); - executeRun(runner, config); - runner.waitForFinish(); - - assertEquals(count * partitionCount, StreamTableJoinApp.received.size()); - assertEquals(count * partitionCount, StreamTableJoinApp.joined.size()); - assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView); - } - - static class DualStreamTableJoinApp implements StreamApplication { - static List<Profile> sentToProfileTable1 = new LinkedList<>(); - static List<Profile> sentToProfileTable2 = new LinkedList<>(); - static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>(); - static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>(); - - @Override - public void describe(StreamApplicationDescriptor appDesc) { - KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()); - KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()); - - PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); - PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); - - Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde)); - - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); - GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>()); - MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1); - MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2); - - profileStream1 - .map(m -> { - sentToProfileTable1.add(m); - return new KV(m.getMemberId(), m); - }) - .sendTo(profileTable); - profileStream2 - .map(m -> { - sentToProfileTable2.add(m); - return new KV(m.getMemberId(), m); - }) - .sendTo(profileTable); - - GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>()); - GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>()); - MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1); - MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2); - - pageViewStream1 - .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") - .join(profileTable, joinFn1) - .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); - - pageViewStream2 - .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") - .join(profileTable, joinFn2) - .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); - } - } - - @Test - public void testDualStreamTableJoin() throws Exception { - - int count = 10; - PageView[] pageViews = TestTableData.generatePageViews(count); - Profile[] profiles = TestTableData.generateProfiles(count); - - int partitionCount = 4; - Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); - - configs.put("streams.Profile1.samza.system", "test"); - configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles)); - configs.put("streams.Profile1.samza.bootstrap", "true"); - configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount)); - - configs.put("streams.Profile2.samza.system", "test"); - configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles)); - configs.put("streams.Profile2.samza.bootstrap", "true"); - configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount)); - - configs.put("streams.PageView1.samza.system", "test"); - configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews)); - configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount)); - - configs.put("streams.PageView2.samza.system", "test"); - configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews)); - configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount)); - - Config config = new MapConfig(configs); - final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config); - executeRun(runner, config); - runner.waitForFinish(); - - assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size()); - assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size()); - - assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size()); - assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size()); - assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView); - assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView); - } - - static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) { - Map<String, String> configs = new HashMap<>(); - configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName()); - - configs.put(JobConfig.JOB_NAME(), "test-table-job"); - configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); - configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); - - // For intermediate streams - configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); - configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl); - configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect); - configs.put("systems.kafka.samza.key.serde", "int"); - configs.put("systems.kafka.samza.msg.serde", "json"); - configs.put("systems.kafka.default.stream.replication.factor", "1"); - configs.put("job.default.system", "kafka"); - - configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory"); - configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); - - return configs; - } - - private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> { - - private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>(); - - private transient List<Profile> received; - private transient ReadableTable table; - - @Override - public void init(Context context) { - table = (ReadableTable) context.getTaskContext().getTable("t1"); - this.received = new ArrayList<>(); - - taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this); - } - - @Override - public KV<Integer, Profile> apply(Profile profile) { - received.add(profile); - return new KV(profile.getMemberId(), profile); - } - - public static MyMapFunction getMapFunctionByTask(String taskName) { - return taskToMapFunctionMap.get(taskName); - } - } - - @Test - public void testWithLowLevelApi() throws Exception { - - Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); - configs.put("streams.PageView.samza.system", "test"); - configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10))); - configs.put("streams.PageView.partitionCount", String.valueOf(4)); - configs.put("task.inputs", "test.PageView"); - - Config config = new MapConfig(configs); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config); - executeRun(runner, config); - runner.waitForFinish(); - } - - static public class MyTaskApplication implements TaskApplication { - @Override - public void describe(TaskApplicationDescriptor appDescriptor) { - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); - appDescriptor - .withInputStream(pageViewISD) - .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()))) - .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask()); - } - } - - static public class MyStreamTask implements StreamTask, InitableTask { - private ReadWriteTable<Integer, PageView> pageViewTable; - @Override - public void init(Context context) throws Exception { - pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1"); - } - @Override - public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { - PageView pv = (PageView) message.getMessage(); - pageViewTable.put(pv.getMemberId(), pv); - PageView pv2 = pageViewTable.get(pv.getMemberId()); - Assert.assertEquals(pv.getMemberId(), pv2.getMemberId()); - Assert.assertEquals(pv.getPageKey(), pv2.getPageKey()); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java new file mode 100644 index 0000000..0303c26 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.table; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.TaskApplicationDescriptor; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.context.Context; +import org.apache.samza.system.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.system.descriptors.DelegatingSystemDescriptor; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.Table; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.apache.samza.test.util.ArraySystemFactory; +import org.apache.samza.test.util.Base64Serializer; + +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.samza.test.table.TestTableData.EnrichedPageView; +import static org.apache.samza.test.table.TestTableData.PageView; +import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde; +import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory; +import static org.apache.samza.test.table.TestTableData.Profile; +import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +/** + * This test class tests sendTo() and join() for local tables + */ +public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness { + + @Test + public void testSendTo() throws Exception { + + int count = 10; + Profile[] profiles = TestTableData.generateProfiles(count); + + int partitionCount = 4; + Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); + + configs.put("streams.Profile.samza.system", "test"); + configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); + + MyMapFunction mapFn = new MyMapFunction(); + + final StreamApplication app = appDesc -> { + + Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1", + KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + + appDesc.getInputStream(isd) + .map(mapFn) + .sendTo(table); + }; + + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + executeRun(runner, config); + runner.waitForFinish(); + + for (int i = 0; i < partitionCount; i++) { + MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i)); + assertEquals(count, mapFnCopy.received.size()); + mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null)); + } + } + + static class StreamTableJoinApp implements StreamApplication { + static List<PageView> received = new LinkedList<>(); + static List<EnrichedPageView> joined = new LinkedList<>(); + + @Override + public void describe(StreamApplicationDescriptor appDesc) { + Table<KV<Integer, Profile>> table = appDesc.getTable( + new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + appDesc.getInputStream(profileISD) + .map(m -> new KV(m.getMemberId(), m)) + .sendTo(table); + + GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + appDesc.getInputStream(pageViewISD) + .map(pv -> { + received.add(pv); + return pv; + }) + .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1") + .join(table, new PageViewToProfileJoinFunction()) + .sink((m, collector, coordinator) -> joined.add(m)); + } + } + + @Test + public void testStreamTableJoin() throws Exception { + + int count = 10; + PageView[] pageViews = TestTableData.generatePageViews(count); + Profile[] profiles = TestTableData.generateProfiles(count); + + int partitionCount = 4; + Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); + + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews)); + configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.Profile.samza.system", "test"); + configs.put("streams.Profile.samza.bootstrap", "true"); + configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); + + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config); + executeRun(runner, config); + runner.waitForFinish(); + + assertEquals(count * partitionCount, StreamTableJoinApp.received.size()); + assertEquals(count * partitionCount, StreamTableJoinApp.joined.size()); + assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView); + } + + static class DualStreamTableJoinApp implements StreamApplication { + static List<Profile> sentToProfileTable1 = new LinkedList<>(); + static List<Profile> sentToProfileTable2 = new LinkedList<>(); + static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>(); + static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>(); + + @Override + public void describe(StreamApplicationDescriptor appDesc) { + KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()); + KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()); + + PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); + PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); + + Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde)); + + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); + GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>()); + MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1); + MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2); + + profileStream1 + .map(m -> { + sentToProfileTable1.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + profileStream2 + .map(m -> { + sentToProfileTable2.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + + GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>()); + GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>()); + MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1); + MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2); + + pageViewStream1 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") + .join(profileTable, joinFn1) + .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); + + pageViewStream2 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") + .join(profileTable, joinFn2) + .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); + } + } + + @Test + public void testDualStreamTableJoin() throws Exception { + + int count = 10; + PageView[] pageViews = TestTableData.generatePageViews(count); + Profile[] profiles = TestTableData.generateProfiles(count); + + int partitionCount = 4; + Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); + + configs.put("streams.Profile1.samza.system", "test"); + configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile1.samza.bootstrap", "true"); + configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.Profile2.samza.system", "test"); + configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile2.samza.bootstrap", "true"); + configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.PageView1.samza.system", "test"); + configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews)); + configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.PageView2.samza.system", "test"); + configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews)); + configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount)); + + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config); + executeRun(runner, config); + runner.waitForFinish(); + + assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size()); + assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size()); + + assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size()); + assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size()); + assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView); + assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView); + } + + static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) { + Map<String, String> configs = new HashMap<>(); + configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName()); + + configs.put(JobConfig.JOB_NAME(), "test-table-job"); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + // For intermediate streams + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect); + configs.put("systems.kafka.samza.key.serde", "int"); + configs.put("systems.kafka.samza.msg.serde", "json"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory"); + configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); + + return configs; + } + + private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> { + + private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>(); + + private transient List<Profile> received; + private transient ReadWriteTable table; + + @Override + public void init(Context context) { + table = context.getTaskContext().getTable("t1"); + this.received = new ArrayList<>(); + + taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this); + } + + @Override + public KV<Integer, Profile> apply(Profile profile) { + received.add(profile); + return new KV(profile.getMemberId(), profile); + } + + public static MyMapFunction getMapFunctionByTask(String taskName) { + return taskToMapFunctionMap.get(taskName); + } + } + + @Test + public void testWithLowLevelApi() throws Exception { + + Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10))); + configs.put("streams.PageView.partitionCount", String.valueOf(4)); + configs.put("task.inputs", "test.PageView"); + + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config); + executeRun(runner, config); + runner.waitForFinish(); + } + + static public class MyTaskApplication implements TaskApplication { + @Override + public void describe(TaskApplicationDescriptor appDescriptor) { + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + appDescriptor + .withInputStream(pageViewISD) + .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()))) + .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask()); + } + } + + static public class MyStreamTask implements StreamTask, InitableTask { + private ReadWriteTable<Integer, PageView> pageViewTable; + @Override + public void init(Context context) throws Exception { + pageViewTable = context.getTaskContext().getTable("t1"); + } + @Override + public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { + PageView pv = (PageView) message.getMessage(); + pageViewTable.put(pv.getMemberId(), pv); + PageView pv2 = pageViewTable.get(pv.getMemberId()); + Assert.assertEquals(pv.getMemberId(), pv2.getMemberId()); + Assert.assertEquals(pv.getPageKey(), pv2.getPageKey()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java deleted file mode 100644 index 3de8300..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.table; - -import com.google.common.cache.CacheBuilder; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.descriptors.StreamApplicationDescriptor; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.context.Context; -import org.apache.samza.context.MockContext; -import org.apache.samza.system.descriptors.GenericInputDescriptor; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.Timer; -import org.apache.samza.operators.KV; -import org.apache.samza.table.descriptors.TableDescriptor; -import org.apache.samza.system.descriptors.DelegatingSystemDescriptor; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.table.Table; -import org.apache.samza.table.descriptors.CachingTableDescriptor; -import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor; -import org.apache.samza.table.remote.RemoteReadWriteTable; -import org.apache.samza.table.remote.RemoteReadableTable; -import org.apache.samza.table.descriptors.RemoteTableDescriptor; -import org.apache.samza.table.remote.TableRateLimiter; -import org.apache.samza.table.remote.TableReadFunction; -import org.apache.samza.table.remote.TableWriteFunction; -import org.apache.samza.test.harness.AbstractIntegrationTestHarness; -import org.apache.samza.test.util.Base64Serializer; -import org.apache.samza.util.RateLimiter; - -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.samza.test.table.TestTableData.EnrichedPageView; -import static org.apache.samza.test.table.TestTableData.PageView; -import static org.apache.samza.test.table.TestTableData.Profile; -import static org.apache.samza.test.table.TestTableData.generatePageViews; -import static org.apache.samza.test.table.TestTableData.generateProfiles; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.withSettings; - - -public class TestRemoteTable extends AbstractIntegrationTestHarness { - - static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>(); - - static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> { - private final String serializedProfiles; - private transient Map<Integer, Profile> profileMap; - - private InMemoryReadFunction(String profiles) { - this.serializedProfiles = profiles; - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class); - this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity())); - } - - @Override - public CompletableFuture<Profile> getAsync(Integer key) { - return CompletableFuture.completedFuture(profileMap.get(key)); - } - - @Override - public boolean isRetriable(Throwable exception) { - return false; - } - - static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) { - return new InMemoryReadFunction(serializedProfiles); - } - } - - static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> { - private transient List<EnrichedPageView> records; - private String testName; - - public InMemoryWriteFunction(String testName) { - this.testName = testName; - } - - // Verify serializable functionality - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - // Write to the global list for verification - records = writtenRecords.get(testName); - } - - @Override - public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) { - records.add(record); - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> deleteAsync(Integer key) { - records.remove(key); - return CompletableFuture.completedFuture(null); - } - - @Override - public boolean isRetriable(Throwable exception) { - return false; - } - } - - private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) { - CachingTableDescriptor<K, V> cachingDesc; - if (defaultCache) { - cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc); - cachingDesc.withReadTtl(Duration.ofMinutes(5)); - cachingDesc.withWriteTtl(Duration.ofMinutes(5)); - } else { - GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id); - guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build()); - cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc); - } - - return appDesc.getTable(cachingDesc); - } - - static class MyReadFunction implements TableReadFunction { - @Override - public CompletableFuture getAsync(Object key) { - return null; - } - - @Override - public boolean isRetriable(Throwable exception) { - return false; - } - } - - private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception { - final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName); - - writtenRecords.put(testName, new ArrayList<>()); - - int count = 10; - PageView[] pageViews = generatePageViews(count); - String profiles = Base64Serializer.serialize(generateProfiles(count)); - - int partitionCount = 4; - Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect()); - - configs.put("streams.PageView.samza.system", "test"); - configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews)); - configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); - - final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable()); - final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable()); - final StreamApplication app = appDesc -> { - RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); - inputTableDesc - .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) - .withRateLimiter(readRateLimiter, null, null); - - // dummy reader - TableReadFunction readFn = new MyReadFunction(); - - RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); - outputTableDesc - .withReadFunction(readFn) - .withWriteFunction(writer) - .withRateLimiter(writeRateLimiter, null, null); - - Table<KV<Integer, EnrichedPageView>> outputTable = withCache - ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc) - : appDesc.getTable(outputTableDesc); - - Table<KV<Integer, Profile>> inputTable = withCache - ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc) - : appDesc.getTable(inputTableDesc); - - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); - appDesc.getInputStream(isd) - .map(pv -> new KV<>(pv.getMemberId(), pv)) - .join(inputTable, new PageViewToProfileJoinFunction()) - .map(m -> new KV(m.getMemberId(), m)) - .sendTo(outputTable); - }; - - Config config = new MapConfig(configs); - final LocalApplicationRunner runner = new LocalApplicationRunner(app, config); - executeRun(runner, config); - runner.waitForFinish(); - - int numExpected = count * partitionCount; - Assert.assertEquals(numExpected, writtenRecords.get(testName).size()); - Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView); - } - - @Test - public void testStreamTableJoinRemoteTable() throws Exception { - doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable"); - } - - @Test - public void testStreamTableJoinRemoteTableWithCache() throws Exception { - doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache"); - } - - @Test - public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception { - doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache"); - } - - private Context createMockContext() { - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString()); - doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString()); - Context context = new MockContext(); - doReturn(new MapConfig()).when(context.getJobContext()).getConfig(); - doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry(); - return context; - } - - @Test(expected = SamzaException.class) - public void testCatchReaderException() { - TableReadFunction<String, ?> reader = mock(TableReadFunction.class); - CompletableFuture<String> future = new CompletableFuture<>(); - future.completeExceptionally(new RuntimeException("Expected test exception")); - doReturn(future).when(reader).getAsync(anyString()); - TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class); - RemoteReadableTable<String, ?> table = new RemoteReadableTable<>( - "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null); - table.init(createMockContext()); - table.get("abc"); - } - - @Test(expected = SamzaException.class) - public void testCatchWriterException() { - TableReadFunction<String, String> reader = mock(TableReadFunction.class); - TableWriteFunction<String, String> writer = mock(TableWriteFunction.class); - CompletableFuture<String> future = new CompletableFuture<>(); - future.completeExceptionally(new RuntimeException("Expected test exception")); - doReturn(future).when(writer).putAsync(anyString(), any()); - TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class); - RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>( - "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null); - table.init(createMockContext()); - table.put("abc", "efg"); - } -}
