Repository: samza Updated Branches: refs/heads/master bc4a0c2de -> 53d7f2625
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index 23fa9e6..d7f0570 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -21,11 +21,11 @@ package org.apache.samza.test.table; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -96,17 +96,59 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { runner.run(app); runner.waitForFinish(); - assertEquals(count * partitionCount, mapFn.received.size()); - assertEquals(count, new HashSet(mapFn.received).size()); - mapFn.received.forEach(p -> Assert.assertTrue(mapFn.table.get(p.getMemberId()) != null)); + for (int i = 0; i < partitionCount; i++) { + MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i)); + assertEquals(count, mapFnCopy.received.size()); + mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null)); + } + } + + static class TestStreamTableJoin { + static List<PageView> received = new LinkedList<>(); + static List<EnrichedPageView> joined = new LinkedList<>(); + final int count; + final int partitionCount; + final Map<String, String> configs; + + TestStreamTableJoin(int count, int partitionCount, Map<String, String> configs) { + this.count = count; + this.partitionCount = partitionCount; + this.configs = configs; + } + + void runTest() { + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + final StreamApplication app = (streamGraph, cfg) -> { + + Table<KV<Integer, Profile>> table = streamGraph.getTable( + new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + + streamGraph.getInputStream("Profile", new NoOpSerde<Profile>()) + .map(m -> new KV(m.getMemberId(), m)) + .sendTo(table); + + streamGraph.getInputStream("PageView", new NoOpSerde<PageView>()) + .map(pv -> { + received.add(pv); + return pv; + }) + .partitionBy(PageView::getMemberId, v -> v, "p1") + .join(table, new PageViewToProfileJoinFunction()) + .sink((m, collector, coordinator) -> joined.add(m)); + }; + + runner.run(app); + runner.waitForFinish(); + + assertEquals(count * partitionCount, received.size()); + assertEquals(count * partitionCount, joined.size()); + assertTrue(joined.get(0) instanceof EnrichedPageView); + } } @Test public void testStreamTableJoin() throws Exception { - List<PageView> received = new LinkedList<>(); - List<EnrichedPageView> joined = new LinkedList<>(); - int count = 10; PageView[] pageViews = TestTableData.generatePageViews(count); Profile[] profiles = TestTableData.generateProfiles(count); @@ -123,48 +165,89 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { - - Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1") - .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + TestStreamTableJoin joinTest = new TestStreamTableJoin(count, partitionCount, configs); + joinTest.runTest(); + } - streamGraph.getInputStream("Profile", new NoOpSerde<Profile>()) - .map(m -> new KV(m.getMemberId(), m)) - .sendTo(table); + static class TestDualStreamTableJoin { + static List<Profile> sentToProfileTable1 = new LinkedList<>(); + static List<Profile> sentToProfileTable2 = new LinkedList<>(); + static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>(); + static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>(); + final int count; + final int partitionCount; + final Map<String, String> configs; + + TestDualStreamTableJoin(int count, int partitionCount, Map<String, String> configs) { + this.count = count; + this.partitionCount = partitionCount; + this.configs = configs; + } - streamGraph.getInputStream("PageView", new NoOpSerde<PageView>()) - .map(pv -> { - received.add(pv); - return pv; - }) - .partitionBy(PageView::getMemberId, v -> v, "p1") - .join(table, new PageViewToProfileJoinFunction()) - .sink((m, collector, coordinator) -> joined.add(m)); - }; + void runTest() { + KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()); + KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()); + + PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); + PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + final StreamApplication app = (streamGraph, cfg) -> { + + Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new InMemoryTableDescriptor("t1") + .withSerde(profileKVSerde)); + + MessageStream<Profile> profileStream1 = streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>()); + MessageStream<Profile> profileStream2 = streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>()); + + profileStream1 + .map(m -> { + sentToProfileTable1.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + profileStream2 + .map(m -> { + sentToProfileTable2.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + + MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>()); + MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>()); + + pageViewStream1 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") + .join(profileTable, joinFn1) + .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); + + pageViewStream2 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") + .join(profileTable, joinFn2) + .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); + }; + + runner.run(app); + runner.waitForFinish(); + + assertEquals(count * partitionCount, sentToProfileTable1.size()); + assertEquals(count * partitionCount, sentToProfileTable2.size()); + + for (int i = 0; i < PageViewToProfileJoinFunction.seqNo; i++) { + assertEquals(count * partitionCount, PageViewToProfileJoinFunction.counterPerJoinFn.get(i).intValue()); + } + assertEquals(count * partitionCount, joinedPageViews1.size()); + assertEquals(count * partitionCount, joinedPageViews2.size()); + assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView); + assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView); - runner.run(app); - runner.waitForFinish(); + } - assertEquals(count * partitionCount, received.size()); - assertEquals(count * partitionCount, joined.size()); - assertTrue(joined.get(0) instanceof EnrichedPageView); } @Test public void testDualStreamTableJoin() throws Exception { - List<Profile> sentToProfileTable1 = new LinkedList<>(); - List<Profile> sentToProfileTable2 = new LinkedList<>(); - List<EnrichedPageView> joinedPageViews1 = new LinkedList<>(); - List<EnrichedPageView> joinedPageViews2 = new LinkedList<>(); - - KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()); - KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()); - - PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); - PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); - int count = 10; PageView[] pageViews = TestTableData.generatePageViews(count); Profile[] profiles = TestTableData.generateProfiles(count); @@ -190,53 +273,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews)); configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount)); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { - - Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new InMemoryTableDescriptor("t1") - .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); - - MessageStream<Profile> profileStream1 = streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>()); - MessageStream<Profile> profileStream2 = streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>()); - - profileStream1 - .map(m -> { - sentToProfileTable1.add(m); - return new KV(m.getMemberId(), m); - }) - .sendTo(profileTable); - profileStream2 - .map(m -> { - sentToProfileTable2.add(m); - return new KV(m.getMemberId(), m); - }) - .sendTo(profileTable); - - MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>()); - MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>()); - - pageViewStream1 - .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") - .join(profileTable, joinFn1) - .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); - - pageViewStream2 - .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") - .join(profileTable, joinFn2) - .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); - }; - - runner.run(app); - runner.waitForFinish(); - - assertEquals(count * partitionCount, sentToProfileTable1.size()); - assertEquals(count * partitionCount, sentToProfileTable2.size()); - assertEquals(count * partitionCount, joinFn1.count); - assertEquals(count * partitionCount, joinFn2.count); - assertEquals(count * partitionCount, joinedPageViews1.size()); - assertEquals(count * partitionCount, joinedPageViews2.size()); - assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView); - assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView); + TestDualStreamTableJoin dualJoinTest = new TestDualStreamTableJoin(count, partitionCount, configs); + dualJoinTest.runTest(); } static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) { @@ -264,14 +302,19 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { return configs; } - private class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> { + private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> { - private List<Profile> received = new ArrayList<>(); - private ReadableTable table; + private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>(); + + private transient List<Profile> received; + private transient ReadableTable table; @Override public void init(Config config, TaskContext context) { table = (ReadableTable) context.getTable("t1"); + this.received = new ArrayList<>(); + + taskToMapFunctionMap.put(context.getTaskName().getTaskName(), this); } @Override @@ -279,14 +322,30 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { received.add(profile); return new KV(profile.getMemberId(), profile); } + + public static MyMapFunction getMapFunctionByTask(String taskName) { + return taskToMapFunctionMap.get(taskName); + } } static class PageViewToProfileJoinFunction implements StreamTableJoinFunction <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> { - private int count; + private static Map<Integer, AtomicInteger> counterPerJoinFn = new HashMap<>(); + private static int seqNo = 0; + private final int currentSeqNo; + + public PageViewToProfileJoinFunction() { + this.currentSeqNo = seqNo++; + } + + @Override + public void init(Config config, TaskContext context) { + counterPerJoinFn.put(this.currentSeqNo, new AtomicInteger(0)); + } + @Override public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) { - ++count; + counterPerJoinFn.get(this.currentSeqNo).incrementAndGet(); return r == null ? null : new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), r.getValue().getCompany()); } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index a260c3f..208c670 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -60,15 +60,33 @@ import static org.mockito.Mockito.mock; public class TestRemoteTable extends AbstractIntegrationTestHarness { - private TableReadFunction<Integer, TestTableData.Profile> getInMemoryReader(TestTableData.Profile[] profiles) { - final Map<Integer, TestTableData.Profile> profileMap = Arrays.stream(profiles) - .collect(Collectors.toMap(p -> p.getMemberId(), Function.identity())); - TableReadFunction<Integer, TestTableData.Profile> reader = - (TableReadFunction<Integer, TestTableData.Profile>) key -> profileMap.getOrDefault(key, null); - return reader; - } static List<TestTableData.EnrichedPageView> writtenRecords = new LinkedList<>(); + static List<TestTableData.PageView> received = new LinkedList<>(); + + static class InMemoryReadFunction implements TableReadFunction<Integer, TestTableData.Profile> { + private final String serializedProfiles; + private transient Map<Integer, TestTableData.Profile> profileMap; + + private InMemoryReadFunction(String profiles) { + this.serializedProfiles = profiles; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + TestTableData.Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, TestTableData.Profile[].class); + this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity())); + } + + @Override + public TestTableData.Profile get(Integer key) { + return profileMap.getOrDefault(key, null); + } + + static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) { + return new InMemoryReadFunction(serializedProfiles); + } + } static class InMemoryWriteFunction implements TableWriteFunction<Integer, TestTableData.EnrichedPageView> { private transient List<TestTableData.EnrichedPageView> records; @@ -99,12 +117,11 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { @Test public void testStreamTableJoinRemoteTable() throws Exception { - List<TestTableData.PageView> received = new LinkedList<>(); final InMemoryWriteFunction writer = new InMemoryWriteFunction(); int count = 10; TestTableData.PageView[] pageViews = TestTableData.generatePageViews(count); - TestTableData.Profile[] profiles = TestTableData.generateProfiles(count); + String profiles = Base64Serializer.serialize(TestTableData.generateProfiles(count)); int partitionCount = 4; Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect()); @@ -119,7 +136,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { final StreamApplication app = (streamGraph, cfg) -> { RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); inputTableDesc - .withReadFunction(getInMemoryReader(profiles)) + .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) .withRateLimiter(readRateLimiter, null, null); RemoteTableDescriptor<Integer, TestTableData.EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java index 27d1063..94c1eca 100644 --- a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java @@ -57,12 +57,13 @@ public class TestTimerApp implements StreamApplication { private static class FlatmapTimerFn implements FlatMapFunction<PageView, PageView>, TimerFunction<String, PageView> { - private List<PageView> pageViews = new ArrayList<>(); - private TimerRegistry<String> timerRegistry; + private transient List<PageView> pageViews; + private transient TimerRegistry<String> timerRegistry; @Override public void registerTimer(TimerRegistry<String> timerRegistry) { this.timerRegistry = timerRegistry; + this.pageViews = new ArrayList<>(); } @Override
