Repository: samza
Updated Branches:
  refs/heads/master bc4a0c2de -> 53d7f2625


http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 23fa9e6..d7f0570 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -21,11 +21,11 @@ package org.apache.samza.test.table;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -96,17 +96,59 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     runner.run(app);
     runner.waitForFinish();
 
-    assertEquals(count * partitionCount, mapFn.received.size());
-    assertEquals(count, new HashSet(mapFn.received).size());
-    mapFn.received.forEach(p -> 
Assert.assertTrue(mapFn.table.get(p.getMemberId()) != null));
+    for (int i = 0; i < partitionCount; i++) {
+      MyMapFunction mapFnCopy = 
MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
+      assertEquals(count, mapFnCopy.received.size());
+      mapFnCopy.received.forEach(p -> 
Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+    }
+  }
+
+  static class TestStreamTableJoin {
+    static List<PageView> received = new LinkedList<>();
+    static List<EnrichedPageView> joined = new LinkedList<>();
+    final int count;
+    final int partitionCount;
+    final Map<String, String> configs;
+
+    TestStreamTableJoin(int count, int partitionCount, Map<String, String> 
configs) {
+      this.count = count;
+      this.partitionCount = partitionCount;
+      this.configs = configs;
+    }
+
+    void runTest() {
+      final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+      final StreamApplication app = (streamGraph, cfg) -> {
+
+        Table<KV<Integer, Profile>> table = streamGraph.getTable(
+            new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new 
IntegerSerde(), new ProfileJsonSerde())));
+
+        streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+            .map(m -> new KV(m.getMemberId(), m))
+            .sendTo(table);
+
+        streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
+            .map(pv -> {
+                received.add(pv);
+                return pv;
+              })
+            .partitionBy(PageView::getMemberId, v -> v, "p1")
+            .join(table, new PageViewToProfileJoinFunction())
+            .sink((m, collector, coordinator) -> joined.add(m));
+      };
+
+      runner.run(app);
+      runner.waitForFinish();
+
+      assertEquals(count * partitionCount, received.size());
+      assertEquals(count * partitionCount, joined.size());
+      assertTrue(joined.get(0) instanceof EnrichedPageView);
+    }
   }
 
   @Test
   public void testStreamTableJoin() throws Exception {
 
-    List<PageView> received = new LinkedList<>();
-    List<EnrichedPageView> joined = new LinkedList<>();
-
     int count = 10;
     PageView[] pageViews = TestTableData.generatePageViews(count);
     Profile[] profiles = TestTableData.generateProfiles(count);
@@ -123,48 +165,89 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     configs.put("streams.Profile.source", 
Base64Serializer.serialize(profiles));
     configs.put("streams.Profile.partitionCount", 
String.valueOf(partitionCount));
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-    final StreamApplication app = (streamGraph, cfg) -> {
-
-      Table<KV<Integer, Profile>> table = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+    TestStreamTableJoin joinTest = new TestStreamTableJoin(count, 
partitionCount, configs);
+    joinTest.runTest();
+  }
 
-      streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
-          .map(m -> new KV(m.getMemberId(), m))
-          .sendTo(table);
+  static class TestDualStreamTableJoin {
+    static List<Profile> sentToProfileTable1 = new LinkedList<>();
+    static List<Profile> sentToProfileTable2 = new LinkedList<>();
+    static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+    static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+    final int count;
+    final int partitionCount;
+    final Map<String, String> configs;
+
+    TestDualStreamTableJoin(int count, int partitionCount, Map<String, String> 
configs) {
+      this.count = count;
+      this.partitionCount = partitionCount;
+      this.configs = configs;
+    }
 
-      streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
-          .map(pv -> {
-              received.add(pv);
-              return pv;
-            })
-          .partitionBy(PageView::getMemberId, v -> v, "p1")
-          .join(table, new PageViewToProfileJoinFunction())
-          .sink((m, collector, coordinator) -> joined.add(m));
-    };
+    void runTest() {
+      KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new 
IntegerSerde(), new ProfileJsonSerde());
+      KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new 
IntegerSerde(), new PageViewJsonSerde());
+
+      PageViewToProfileJoinFunction joinFn1 = new 
PageViewToProfileJoinFunction();
+      PageViewToProfileJoinFunction joinFn2 = new 
PageViewToProfileJoinFunction();
+
+      final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+      final StreamApplication app = (streamGraph, cfg) -> {
+
+        Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
+            .withSerde(profileKVSerde));
+
+        MessageStream<Profile> profileStream1 = 
streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>());
+        MessageStream<Profile> profileStream2 = 
streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>());
+
+        profileStream1
+            .map(m -> {
+                sentToProfileTable1.add(m);
+                return new KV(m.getMemberId(), m);
+              })
+            .sendTo(profileTable);
+        profileStream2
+            .map(m -> {
+                sentToProfileTable2.add(m);
+                return new KV(m.getMemberId(), m);
+              })
+            .sendTo(profileTable);
+
+        MessageStream<PageView> pageViewStream1 = 
streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>());
+        MessageStream<PageView> pageViewStream2 = 
streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>());
+
+        pageViewStream1
+            .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+            .join(profileTable, joinFn1)
+            .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+        pageViewStream2
+            .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+            .join(profileTable, joinFn2)
+            .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+      };
+
+      runner.run(app);
+      runner.waitForFinish();
+
+      assertEquals(count * partitionCount, sentToProfileTable1.size());
+      assertEquals(count * partitionCount, sentToProfileTable2.size());
+
+      for (int i = 0; i < PageViewToProfileJoinFunction.seqNo; i++) {
+        assertEquals(count * partitionCount, 
PageViewToProfileJoinFunction.counterPerJoinFn.get(i).intValue());
+      }
+      assertEquals(count * partitionCount, joinedPageViews1.size());
+      assertEquals(count * partitionCount, joinedPageViews2.size());
+      assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
+      assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
 
-    runner.run(app);
-    runner.waitForFinish();
+    }
 
-    assertEquals(count * partitionCount, received.size());
-    assertEquals(count * partitionCount, joined.size());
-    assertTrue(joined.get(0) instanceof EnrichedPageView);
   }
 
   @Test
   public void testDualStreamTableJoin() throws Exception {
 
-    List<Profile> sentToProfileTable1 = new LinkedList<>();
-    List<Profile> sentToProfileTable2 = new LinkedList<>();
-    List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
-    List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
-
-    KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), 
new ProfileJsonSerde());
-    KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new 
IntegerSerde(), new PageViewJsonSerde());
-
-    PageViewToProfileJoinFunction joinFn1 = new 
PageViewToProfileJoinFunction();
-    PageViewToProfileJoinFunction joinFn2 = new 
PageViewToProfileJoinFunction();
-
     int count = 10;
     PageView[] pageViews = TestTableData.generatePageViews(count);
     Profile[] profiles = TestTableData.generateProfiles(count);
@@ -190,53 +273,8 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     configs.put("streams.PageView2.source", 
Base64Serializer.serialize(pageViews));
     configs.put("streams.PageView2.partitionCount", 
String.valueOf(partitionCount));
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-    final StreamApplication app = (streamGraph, cfg) -> {
-
-      Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-
-      MessageStream<Profile> profileStream1 = 
streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>());
-      MessageStream<Profile> profileStream2 = 
streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>());
-
-      profileStream1
-          .map(m -> {
-              sentToProfileTable1.add(m);
-              return new KV(m.getMemberId(), m);
-            })
-          .sendTo(profileTable);
-      profileStream2
-          .map(m -> {
-              sentToProfileTable2.add(m);
-              return new KV(m.getMemberId(), m);
-            })
-          .sendTo(profileTable);
-
-      MessageStream<PageView> pageViewStream1 = 
streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>());
-      MessageStream<PageView> pageViewStream2 = 
streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>());
-
-      pageViewStream1
-          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
-          .join(profileTable, joinFn1)
-          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
-
-      pageViewStream2
-          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
-          .join(profileTable, joinFn2)
-          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
-    };
-
-    runner.run(app);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, sentToProfileTable1.size());
-    assertEquals(count * partitionCount, sentToProfileTable2.size());
-    assertEquals(count * partitionCount, joinFn1.count);
-    assertEquals(count * partitionCount, joinFn2.count);
-    assertEquals(count * partitionCount, joinedPageViews1.size());
-    assertEquals(count * partitionCount, joinedPageViews2.size());
-    assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
-    assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
+    TestDualStreamTableJoin dualJoinTest = new TestDualStreamTableJoin(count, 
partitionCount, configs);
+    dualJoinTest.runTest();
   }
 
   static Map<String, String> getBaseJobConfig(String bootstrapUrl, String 
zkConnect) {
@@ -264,14 +302,19 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     return configs;
   }
 
-  private class MyMapFunction implements MapFunction<Profile, KV<Integer, 
Profile>> {
+  private static class MyMapFunction implements MapFunction<Profile, 
KV<Integer, Profile>> {
 
-    private List<Profile> received = new ArrayList<>();
-    private ReadableTable table;
+    private static Map<String, MyMapFunction> taskToMapFunctionMap = new 
HashMap<>();
+
+    private transient List<Profile> received;
+    private transient ReadableTable table;
 
     @Override
     public void init(Config config, TaskContext context) {
       table = (ReadableTable) context.getTable("t1");
+      this.received = new ArrayList<>();
+
+      taskToMapFunctionMap.put(context.getTaskName().getTaskName(), this);
     }
 
     @Override
@@ -279,14 +322,30 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
       received.add(profile);
       return new KV(profile.getMemberId(), profile);
     }
+
+    public static MyMapFunction getMapFunctionByTask(String taskName) {
+      return taskToMapFunctionMap.get(taskName);
+    }
   }
 
   static class PageViewToProfileJoinFunction implements StreamTableJoinFunction
       <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> 
{
-    private int count;
+    private static Map<Integer, AtomicInteger> counterPerJoinFn = new 
HashMap<>();
+    private static int seqNo = 0;
+    private final int currentSeqNo;
+
+    public PageViewToProfileJoinFunction() {
+      this.currentSeqNo = seqNo++;
+    }
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      counterPerJoinFn.put(this.currentSeqNo, new AtomicInteger(0));
+    }
+
     @Override
     public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, 
Profile> r) {
-      ++count;
+      counterPerJoinFn.get(this.currentSeqNo).incrementAndGet();
       return r == null ? null :
           new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), 
r.getValue().getCompany());
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index a260c3f..208c670 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -60,15 +60,33 @@ import static org.mockito.Mockito.mock;
 
 
 public class TestRemoteTable extends AbstractIntegrationTestHarness {
-  private TableReadFunction<Integer, TestTableData.Profile> 
getInMemoryReader(TestTableData.Profile[] profiles) {
-    final Map<Integer, TestTableData.Profile> profileMap = 
Arrays.stream(profiles)
-        .collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
-    TableReadFunction<Integer, TestTableData.Profile> reader =
-        (TableReadFunction<Integer, TestTableData.Profile>) key -> 
profileMap.getOrDefault(key, null);
-    return reader;
-  }
 
   static List<TestTableData.EnrichedPageView> writtenRecords = new 
LinkedList<>();
+  static List<TestTableData.PageView> received = new LinkedList<>();
+
+  static class InMemoryReadFunction implements TableReadFunction<Integer, 
TestTableData.Profile> {
+    private final String serializedProfiles;
+    private transient Map<Integer, TestTableData.Profile> profileMap;
+
+    private InMemoryReadFunction(String profiles) {
+      this.serializedProfiles = profiles;
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+      in.defaultReadObject();
+      TestTableData.Profile[] profiles = 
Base64Serializer.deserialize(this.serializedProfiles, 
TestTableData.Profile[].class);
+      this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> 
p.getMemberId(), Function.identity()));
+    }
+
+    @Override
+    public TestTableData.Profile get(Integer key) {
+      return profileMap.getOrDefault(key, null);
+    }
+
+    static InMemoryReadFunction getInMemoryReadFunction(String 
serializedProfiles) {
+      return new InMemoryReadFunction(serializedProfiles);
+    }
+  }
 
   static class InMemoryWriteFunction implements TableWriteFunction<Integer, 
TestTableData.EnrichedPageView> {
     private transient List<TestTableData.EnrichedPageView> records;
@@ -99,12 +117,11 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
 
   @Test
   public void testStreamTableJoinRemoteTable() throws Exception {
-    List<TestTableData.PageView> received = new LinkedList<>();
     final InMemoryWriteFunction writer = new InMemoryWriteFunction();
 
     int count = 10;
     TestTableData.PageView[] pageViews = 
TestTableData.generatePageViews(count);
-    TestTableData.Profile[] profiles = TestTableData.generateProfiles(count);
+    String profiles = 
Base64Serializer.serialize(TestTableData.generateProfiles(count));
 
     int partitionCount = 4;
     Map<String, String> configs = 
TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
@@ -119,7 +136,7 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     final StreamApplication app = (streamGraph, cfg) -> {
       RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = 
new RemoteTableDescriptor<>("profile-table-1");
       inputTableDesc
-          .withReadFunction(getInMemoryReader(profiles))
+          
.withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
           .withRateLimiter(readRateLimiter, null, null);
 
       RemoteTableDescriptor<Integer, TestTableData.EnrichedPageView> 
outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
index 27d1063..94c1eca 100644
--- a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
@@ -57,12 +57,13 @@ public class TestTimerApp implements StreamApplication {
 
   private static class FlatmapTimerFn implements FlatMapFunction<PageView, 
PageView>, TimerFunction<String, PageView> {
 
-    private List<PageView> pageViews = new ArrayList<>();
-    private TimerRegistry<String> timerRegistry;
+    private transient List<PageView> pageViews;
+    private transient TimerRegistry<String> timerRegistry;
 
     @Override
     public void registerTimer(TimerRegistry<String> timerRegistry) {
       this.timerRegistry = timerRegistry;
+      this.pageViews = new ArrayList<>();
     }
 
     @Override

Reply via email to