Repository: samza
Updated Branches:
  refs/heads/master e3efdf5c8 -> e74998c5e


http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
new file mode 100644
index 0000000..fead086
--- /dev/null
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.samza.table.ReadableTable;
+
+
+/**
+ * A store backed readable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, 
V> {
+
+  protected KeyValueStore<K, V> kvStore;
+
+  /**
+   * Constructs an instance of {@link LocalStoreBackedReadableTable}
+   * @param kvStore the backing store
+   */
+  public LocalStoreBackedReadableTable(KeyValueStore<K, V> kvStore) {
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public V get(K key) {
+    return kvStore.get(key);
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    return keys.stream().collect(Collectors.toMap(k -> k, k -> 
kvStore.get(k)));
+  }
+
+  @Override
+  public void close() {
+    // The KV store is not closed here as it may still be needed by downstream 
operators,
+    // it will be closed by the SamzaContainer
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
new file mode 100644
index 0000000..9c95637
--- /dev/null
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.table.TableSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestLocalBaseStoreBackedTableProvider {
+
+  private BaseLocalStoreBackedTableProvider tableProvider;
+
+  @Before
+  public void prepare() {
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn("t1");
+    tableProvider = new BaseLocalStoreBackedTableProvider(tableSpec) {
+      @Override
+      public Map<String, String> generateConfig(Map<String, String> config) {
+        return generateCommonStoreConfig(config);
+      }
+    };
+  }
+
+  @Test
+  public void testInit() {
+    StorageEngine store = mock(KeyValueStorageEngine.class);
+    tableProvider.init(store);
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInitFail() {
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test
+  public void testGenerateCommonStoreConfig() {
+    Map<String, String> config = new HashMap<>();
+    config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    Map<String, String> tableConfig = tableProvider.generateConfig(config);
+    Assert.assertEquals("ks1", 
tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", 
tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
new file mode 100644
index 0000000..8f7eb5d
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import org.apache.samza.test.table.TestTableData.Profile;
+import org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test class tests sendTo() and join() for local tables
+ */
+public class TestLocalTable extends AbstractIntegrationTestHarness {
+
+  @Test
+  public void testSendTo() throws  Exception {
+
+    int count = 10;
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig();
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.source", 
Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", 
String.valueOf(partitionCount));
+
+    MyMapFunction mapFn = new MyMapFunction();
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+
+      Table<KV<Integer, Profile>> table = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+
+      streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+          .map(mapFn)
+          .sendTo(table);
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, mapFn.received.size());
+    assertEquals(count, new HashSet(mapFn.received).size());
+    mapFn.received.forEach(p -> 
Assert.assertTrue(mapFn.table.get(p.getMemberId()) != null));
+  }
+
+  @Test
+  public void testStreamTableJoin() throws Exception {
+
+    List<PageView> received = new LinkedList<>();
+    List<EnrichedPageView> joined = new LinkedList<>();
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig();
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", 
Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", 
String.valueOf(partitionCount));
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.samza.bootstrap", "true");
+    configs.put("streams.Profile.source", 
Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", 
String.valueOf(partitionCount));
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+
+      Table<KV<Integer, Profile>> table = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+
+      streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(table);
+
+      streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
+          .map(pv -> {
+              received.add(pv);
+              return pv;
+            })
+          .partitionBy(PageView::getMemberId, v -> v, "p1")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sink((m, collector, coordinator) -> joined.add(m));
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, received.size());
+    assertEquals(count * partitionCount, joined.size());
+    assertTrue(joined.get(0) instanceof EnrichedPageView);
+  }
+
+  @Test
+  public void testDualStreamTableJoin() throws Exception {
+
+    List<Profile> sentToProfileTable1 = new LinkedList<>();
+    List<Profile> sentToProfileTable2 = new LinkedList<>();
+    List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+    List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+
+    KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), 
new ProfileJsonSerde());
+    KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new 
IntegerSerde(), new PageViewJsonSerde());
+
+    PageViewToProfileJoinFunction joinFn1 = new 
PageViewToProfileJoinFunction();
+    PageViewToProfileJoinFunction joinFn2 = new 
PageViewToProfileJoinFunction();
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig();
+
+    configs.put("streams.Profile1.samza.system", "test");
+    configs.put("streams.Profile1.source", 
Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile1.samza.bootstrap", "true");
+    configs.put("streams.Profile1.partitionCount", 
String.valueOf(partitionCount));
+
+    configs.put("streams.Profile2.samza.system", "test");
+    configs.put("streams.Profile2.source", 
Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile2.samza.bootstrap", "true");
+    configs.put("streams.Profile2.partitionCount", 
String.valueOf(partitionCount));
+
+    configs.put("streams.PageView1.samza.system", "test");
+    configs.put("streams.PageView1.source", 
Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView1.partitionCount", 
String.valueOf(partitionCount));
+
+    configs.put("streams.PageView2.samza.system", "test");
+    configs.put("streams.PageView2.source", 
Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView2.partitionCount", 
String.valueOf(partitionCount));
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+
+      Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+
+      MessageStream<Profile> profileStream1 = 
streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>());
+      MessageStream<Profile> profileStream2 = 
streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>());
+
+      profileStream1
+          .map(m -> {
+              sentToProfileTable1.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+      profileStream2
+          .map(m -> {
+              sentToProfileTable2.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+
+      MessageStream<PageView> pageViewStream1 = 
streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>());
+      MessageStream<PageView> pageViewStream2 = 
streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>());
+
+      pageViewStream1
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+          .join(profileTable, joinFn1)
+          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+      pageViewStream2
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+          .join(profileTable, joinFn2)
+          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, sentToProfileTable1.size());
+    assertEquals(count * partitionCount, sentToProfileTable2.size());
+    assertEquals(count * partitionCount, joinFn1.count);
+    assertEquals(count * partitionCount, joinFn2.count);
+    assertEquals(count * partitionCount, joinedPageViews1.size());
+    assertEquals(count * partitionCount, joinedPageViews2.size());
+    assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
+    assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
+  }
+
+  private Map<String, String> getBaseJobConfig() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.test.samza.factory", 
ArraySystemFactory.class.getName());
+
+    configs.put(JobConfig.JOB_NAME(), "test-table-job");
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
PassthroughCoordinationUtilsFactory.class.getName());
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+
+    // For intermediate streams
+    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configs.put("systems.kafka.samza.key.serde", "int");
+    configs.put("systems.kafka.samza.msg.serde", "json");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.int.class", 
"org.apache.samza.serializers.IntegerSerdeFactory");
+    configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
+
+    return configs;
+  }
+
+  private class MyMapFunction implements MapFunction<Profile, KV<Integer, 
Profile>> {
+
+    private List<Profile> received = new ArrayList<>();
+    private ReadableTable table;
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      table = (ReadableTable) context.getTable("t1");
+    }
+
+    @Override
+    public KV<Integer, Profile> apply(Profile profile) {
+      received.add(profile);
+      return new KV(profile.getMemberId(), profile);
+    }
+  }
+
+  private class PageViewToProfileJoinFunction implements 
StreamTableJoinFunction
+      <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> 
{
+    private int count;
+    @Override
+    public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, 
Profile> r) {
+      ++count;
+      return r == null ? null :
+          new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), 
r.getValue().getCompany());
+    }
+
+    @Override
+    public Integer getMessageKey(KV<Integer, PageView> message) {
+      return message.getKey();
+    }
+
+    @Override
+    public Integer getRecordKey(KV<Integer, Profile> record) {
+      return record.getKey();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
new file mode 100644
index 0000000..dfd0d1b
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.Serializable;
+import java.util.Random;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+
+public class TestTableData {
+
+  public static class PageView implements Serializable {
+    @JsonProperty("pageKey")
+    final String pageKey;
+    @JsonProperty("memberId")
+    final int memberId;
+
+    @JsonProperty("pageKey")
+    public String getPageKey() {
+      return pageKey;
+    }
+
+    @JsonProperty("memberId")
+    public int getMemberId() {
+      return memberId;
+    }
+
+    @JsonCreator
+    public PageView(@JsonProperty("pageKey") String pageKey, 
@JsonProperty("memberId") int memberId) {
+      this.pageKey = pageKey;
+      this.memberId = memberId;
+    }
+  }
+
+  public static class Profile implements Serializable {
+    @JsonProperty("memberId")
+    final int memberId;
+
+    @JsonProperty("company")
+    final String company;
+
+    @JsonProperty("memberId")
+    public int getMemberId() {
+      return memberId;
+    }
+
+    @JsonProperty("company")
+    public String getCompany() {
+      return company;
+    }
+
+    @JsonCreator
+    public Profile(@JsonProperty("memberId") int memberId, 
@JsonProperty("company") String company) {
+      this.memberId = memberId;
+      this.company = company;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || !(o instanceof Profile)) {
+        return false;
+      }
+      return ((Profile) o).getMemberId() == memberId;
+    }
+
+    @Override
+    public int hashCode() {
+      return memberId;
+    }
+  }
+
+  public static class EnrichedPageView extends PageView {
+
+    @JsonProperty("company")
+    final String company;
+
+    @JsonProperty("company")
+    public String getCompany() {
+      return company;
+    }
+
+    @JsonCreator
+    public EnrichedPageView(
+        @JsonProperty("pageKey") String pageKey,
+        @JsonProperty("memberId") int memberId,
+        @JsonProperty("company") String company) {
+      super(pageKey, memberId);
+      this.company = company;
+    }
+  }
+
+  public static class PageViewJsonSerdeFactory implements 
SerdeFactory<PageView> {
+    @Override public Serde<PageView> getSerde(String name, Config config) {
+      return new PageViewJsonSerde();
+    }
+  }
+
+  public static class ProfileJsonSerdeFactory implements SerdeFactory<Profile> 
{
+    @Override public Serde<Profile> getSerde(String name, Config config) {
+      return new ProfileJsonSerde();
+    }
+  }
+
+  public static class PageViewJsonSerde implements Serde<PageView> {
+
+    @Override
+    public PageView fromBytes(byte[] bytes) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<PageView>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(PageView pv) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.writeValueAsString(pv).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  public static class ProfileJsonSerde implements Serde<Profile> {
+
+    @Override
+    public Profile fromBytes(byte[] bytes) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<Profile>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(Profile p) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.writeValueAsString(p).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", 
"group", "job"};
+
+  static public PageView[] generatePageViews(int count) {
+    Random random = new Random();
+    PageView[] pageviews = new PageView[count];
+    for (int i = 0; i < count; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = random.nextInt(10);
+      pageviews[i] = new PageView(pagekey, memberId);
+    }
+    return pageviews;
+  }
+
+  private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB", 
"AMZN", "CSCO"};
+
+  static public Profile[] generateProfiles(int count) {
+    Random random = new Random();
+    Profile[] profiles = new Profile[count];
+    for (int i = 0; i < count; i++) {
+      String company = COMPANIES[random.nextInt(COMPANIES.length - 1)];
+      profiles[i] = new Profile(i, company);
+    }
+    return profiles;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
index 832457b..6ba28ae 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -58,9 +59,10 @@ public class ArraySystemConsumer implements SystemConsumer {
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
poll(Set<SystemStreamPartition> set, long l) throws InterruptedException {
     if (!done) {
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = 
new HashMap<>();
+      final AtomicInteger offset = new AtomicInteger(0);
       set.forEach(ssp -> {
           List<IncomingMessageEnvelope> envelopes = 
Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
-              .map(object -> new IncomingMessageEnvelope(ssp, null, null, 
object)).collect(Collectors.toList());
+              .map(object -> new IncomingMessageEnvelope(ssp, 
String.valueOf(offset.incrementAndGet()), null, 
object)).collect(Collectors.toList());
           envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
           envelopeMap.put(ssp, envelopes);
         });

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java 
b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
index 8890a2f..c735c74 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
@@ -24,7 +24,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -50,13 +52,17 @@ public class SimpleSystemAdmin implements SystemAdmin {
   @Override
   public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
     return streamNames.stream()
-        .collect(Collectors.toMap(
-            Function.<String>identity(),
-            streamName -> {
+        .collect(Collectors.toMap(Function.identity(), streamName -> {
+            int messageCount = isBootstrapStream(streamName) ? 
getMessageCount(streamName) : -1;
+            String oldestOffset = messageCount < 0 ? null : "0";
+            String newestOffset = messageCount < 0 ? null : 
String.valueOf(messageCount - 1);
+            String upcomingOffset = messageCount < 0 ? null : 
String.valueOf(messageCount);
             Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
metadataMap = new HashMap<>();
             int partitionCount = config.getInt("streams." + streamName + 
".partitionCount", 1);
             for (int i = 0; i < partitionCount; i++) {
-              metadataMap.put(new Partition(i), new 
SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null));
+              metadataMap.put(new Partition(i), new 
SystemStreamMetadata.SystemStreamPartitionMetadata(
+                  oldestOffset, newestOffset, upcomingOffset
+              ));
             }
             return new SystemStreamMetadata(streamName, metadataMap);
           }));
@@ -71,5 +77,17 @@ public class SimpleSystemAdmin implements SystemAdmin {
     }
     return offset1.compareTo(offset2);
   }
+
+  private int getMessageCount(String streamName) {
+    try {
+      return Base64Serializer.deserialize(config.get("streams." + streamName + 
".source"), Object[].class).length;
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  private boolean isBootstrapStream(String streamName) {
+    return "true".equalsIgnoreCase(config.get("streams." + streamName + 
".samza.bootstrap", "false"));
+  }
 }
 

Reply via email to