Repository: samza Updated Branches: refs/heads/master e3efdf5c8 -> e74998c5e
http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java new file mode 100644 index 0000000..fead086 --- /dev/null +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.samza.table.ReadableTable; + + +/** + * A store backed readable table + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> { + + protected KeyValueStore<K, V> kvStore; + + /** + * Constructs an instance of {@link LocalStoreBackedReadableTable} + * @param kvStore the backing store + */ + public LocalStoreBackedReadableTable(KeyValueStore<K, V> kvStore) { + this.kvStore = kvStore; + } + + @Override + public V get(K key) { + return kvStore.get(key); + } + + @Override + public Map<K, V> getAll(List<K> keys) { + return keys.stream().collect(Collectors.toMap(k -> k, k -> kvStore.get(k))); + } + + @Override + public void close() { + // The KV store is not closed here as it may still be needed by downstream operators, + // it will be closed by the SamzaContainer + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java new file mode 100644 index 0000000..9c95637 --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.storage.StorageEngine; +import org.apache.samza.table.TableSpec; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.Assert; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestLocalBaseStoreBackedTableProvider { + + private BaseLocalStoreBackedTableProvider tableProvider; + + @Before + public void prepare() { + TableSpec tableSpec = mock(TableSpec.class); + when(tableSpec.getId()).thenReturn("t1"); + tableProvider = new BaseLocalStoreBackedTableProvider(tableSpec) { + @Override + public Map<String, String> generateConfig(Map<String, String> config) { + return generateCommonStoreConfig(config); + } + }; + } + + @Test + public void testInit() { + StorageEngine store = mock(KeyValueStorageEngine.class); + tableProvider.init(store); + Assert.assertNotNull(tableProvider.getTable()); + } + + @Test(expected = SamzaException.class) + public void testInitFail() { + Assert.assertNotNull(tableProvider.getTable()); + } + + @Test + public void testGenerateCommonStoreConfig() { + Map<String, String> config = new HashMap<>(); + config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); + config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); + + Map<String, String> tableConfig = tableProvider.generateConfig(config); + Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); + Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java new file mode 100644 index 0000000..8f7eb5d --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.table; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor; +import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.Table; +import org.apache.samza.task.TaskContext; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.apache.samza.test.table.TestTableData.EnrichedPageView; +import org.apache.samza.test.table.TestTableData.PageView; +import org.apache.samza.test.table.TestTableData.PageViewJsonSerde; +import org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory; +import org.apache.samza.test.table.TestTableData.Profile; +import org.apache.samza.test.table.TestTableData.ProfileJsonSerde; +import org.apache.samza.test.util.ArraySystemFactory; +import org.apache.samza.test.util.Base64Serializer; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This test class tests sendTo() and join() for local tables + */ +public class TestLocalTable extends AbstractIntegrationTestHarness { + + @Test + public void testSendTo() throws Exception { + + int count = 10; + Profile[] profiles = TestTableData.generateProfiles(count); + + int partitionCount = 4; + Map<String, String> configs = getBaseJobConfig(); + + configs.put("streams.Profile.samza.system", "test"); + configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); + + MyMapFunction mapFn = new MyMapFunction(); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + final StreamApplication app = (streamGraph, cfg) -> { + + Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1") + .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + + streamGraph.getInputStream("Profile", new NoOpSerde<Profile>()) + .map(mapFn) + .sendTo(table); + }; + + runner.run(app); + runner.waitForFinish(); + + assertEquals(count * partitionCount, mapFn.received.size()); + assertEquals(count, new HashSet(mapFn.received).size()); + mapFn.received.forEach(p -> Assert.assertTrue(mapFn.table.get(p.getMemberId()) != null)); + } + + @Test + public void testStreamTableJoin() throws Exception { + + List<PageView> received = new LinkedList<>(); + List<EnrichedPageView> joined = new LinkedList<>(); + + int count = 10; + PageView[] pageViews = TestTableData.generatePageViews(count); + Profile[] profiles = TestTableData.generateProfiles(count); + + int partitionCount = 4; + Map<String, String> configs = getBaseJobConfig(); + + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews)); + configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.Profile.samza.system", "test"); + configs.put("streams.Profile.samza.bootstrap", "true"); + configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + final StreamApplication app = (streamGraph, cfg) -> { + + Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1") + .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + + streamGraph.getInputStream("Profile", new NoOpSerde<Profile>()) + .map(m -> new KV(m.getMemberId(), m)) + .sendTo(table); + + streamGraph.getInputStream("PageView", new NoOpSerde<PageView>()) + .map(pv -> { + received.add(pv); + return pv; + }) + .partitionBy(PageView::getMemberId, v -> v, "p1") + .join(table, new PageViewToProfileJoinFunction()) + .sink((m, collector, coordinator) -> joined.add(m)); + }; + + runner.run(app); + runner.waitForFinish(); + + assertEquals(count * partitionCount, received.size()); + assertEquals(count * partitionCount, joined.size()); + assertTrue(joined.get(0) instanceof EnrichedPageView); + } + + @Test + public void testDualStreamTableJoin() throws Exception { + + List<Profile> sentToProfileTable1 = new LinkedList<>(); + List<Profile> sentToProfileTable2 = new LinkedList<>(); + List<EnrichedPageView> joinedPageViews1 = new LinkedList<>(); + List<EnrichedPageView> joinedPageViews2 = new LinkedList<>(); + + KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()); + KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()); + + PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); + PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); + + int count = 10; + PageView[] pageViews = TestTableData.generatePageViews(count); + Profile[] profiles = TestTableData.generateProfiles(count); + + int partitionCount = 4; + Map<String, String> configs = getBaseJobConfig(); + + configs.put("streams.Profile1.samza.system", "test"); + configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile1.samza.bootstrap", "true"); + configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.Profile2.samza.system", "test"); + configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles)); + configs.put("streams.Profile2.samza.bootstrap", "true"); + configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.PageView1.samza.system", "test"); + configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews)); + configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount)); + + configs.put("streams.PageView2.samza.system", "test"); + configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews)); + configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount)); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + final StreamApplication app = (streamGraph, cfg) -> { + + Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new InMemoryTableDescriptor("t1") + .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + + MessageStream<Profile> profileStream1 = streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>()); + MessageStream<Profile> profileStream2 = streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>()); + + profileStream1 + .map(m -> { + sentToProfileTable1.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + profileStream2 + .map(m -> { + sentToProfileTable2.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + + MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>()); + MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>()); + + pageViewStream1 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") + .join(profileTable, joinFn1) + .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); + + pageViewStream2 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") + .join(profileTable, joinFn2) + .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); + }; + + runner.run(app); + runner.waitForFinish(); + + assertEquals(count * partitionCount, sentToProfileTable1.size()); + assertEquals(count * partitionCount, sentToProfileTable2.size()); + assertEquals(count * partitionCount, joinFn1.count); + assertEquals(count * partitionCount, joinFn2.count); + assertEquals(count * partitionCount, joinedPageViews1.size()); + assertEquals(count * partitionCount, joinedPageViews2.size()); + assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView); + assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView); + } + + private Map<String, String> getBaseJobConfig() { + Map<String, String> configs = new HashMap<>(); + configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName()); + + configs.put(JobConfig.JOB_NAME(), "test-table-job"); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + // For intermediate streams + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.samza.key.serde", "int"); + configs.put("systems.kafka.samza.msg.serde", "json"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory"); + configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); + + return configs; + } + + private class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> { + + private List<Profile> received = new ArrayList<>(); + private ReadableTable table; + + @Override + public void init(Config config, TaskContext context) { + table = (ReadableTable) context.getTable("t1"); + } + + @Override + public KV<Integer, Profile> apply(Profile profile) { + received.add(profile); + return new KV(profile.getMemberId(), profile); + } + } + + private class PageViewToProfileJoinFunction implements StreamTableJoinFunction + <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> { + private int count; + @Override + public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) { + ++count; + return r == null ? null : + new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), r.getValue().getCompany()); + } + + @Override + public Integer getMessageKey(KV<Integer, PageView> message) { + return message.getKey(); + } + + @Override + public Integer getRecordKey(KV<Integer, Profile> record) { + return record.getKey(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java new file mode 100644 index 0000000..dfd0d1b --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.table; + +import java.io.Serializable; +import java.util.Random; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + + +public class TestTableData { + + public static class PageView implements Serializable { + @JsonProperty("pageKey") + final String pageKey; + @JsonProperty("memberId") + final int memberId; + + @JsonProperty("pageKey") + public String getPageKey() { + return pageKey; + } + + @JsonProperty("memberId") + public int getMemberId() { + return memberId; + } + + @JsonCreator + public PageView(@JsonProperty("pageKey") String pageKey, @JsonProperty("memberId") int memberId) { + this.pageKey = pageKey; + this.memberId = memberId; + } + } + + public static class Profile implements Serializable { + @JsonProperty("memberId") + final int memberId; + + @JsonProperty("company") + final String company; + + @JsonProperty("memberId") + public int getMemberId() { + return memberId; + } + + @JsonProperty("company") + public String getCompany() { + return company; + } + + @JsonCreator + public Profile(@JsonProperty("memberId") int memberId, @JsonProperty("company") String company) { + this.memberId = memberId; + this.company = company; + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof Profile)) { + return false; + } + return ((Profile) o).getMemberId() == memberId; + } + + @Override + public int hashCode() { + return memberId; + } + } + + public static class EnrichedPageView extends PageView { + + @JsonProperty("company") + final String company; + + @JsonProperty("company") + public String getCompany() { + return company; + } + + @JsonCreator + public EnrichedPageView( + @JsonProperty("pageKey") String pageKey, + @JsonProperty("memberId") int memberId, + @JsonProperty("company") String company) { + super(pageKey, memberId); + this.company = company; + } + } + + public static class PageViewJsonSerdeFactory implements SerdeFactory<PageView> { + @Override public Serde<PageView> getSerde(String name, Config config) { + return new PageViewJsonSerde(); + } + } + + public static class ProfileJsonSerdeFactory implements SerdeFactory<Profile> { + @Override public Serde<Profile> getSerde(String name, Config config) { + return new ProfileJsonSerde(); + } + } + + public static class PageViewJsonSerde implements Serde<PageView> { + + @Override + public PageView fromBytes(byte[] bytes) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<PageView>() { }); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public byte[] toBytes(PageView pv) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(pv).getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } + + public static class ProfileJsonSerde implements Serde<Profile> { + + @Override + public Profile fromBytes(byte[] bytes) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<Profile>() { }); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public byte[] toBytes(Profile p) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(p).getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } + + private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"}; + + static public PageView[] generatePageViews(int count) { + Random random = new Random(); + PageView[] pageviews = new PageView[count]; + for (int i = 0; i < count; i++) { + String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; + int memberId = random.nextInt(10); + pageviews[i] = new PageView(pagekey, memberId); + } + return pageviews; + } + + private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"}; + + static public Profile[] generateProfiles(int count) { + Random random = new Random(); + Profile[] profiles = new Profile[count]; + for (int i = 0; i < count; i++) { + String company = COMPANIES[random.nextInt(COMPANIES.length - 1)]; + profiles[i] = new Profile(i, company); + } + return profiles; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java index 832457b..6ba28ae 100644 --- a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java +++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.system.IncomingMessageEnvelope; @@ -58,9 +59,10 @@ public class ArraySystemConsumer implements SystemConsumer { public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long l) throws InterruptedException { if (!done) { Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>(); + final AtomicInteger offset = new AtomicInteger(0); set.forEach(ssp -> { List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config)) - .map(object -> new IncomingMessageEnvelope(ssp, null, null, object)).collect(Collectors.toList()); + .map(object -> new IncomingMessageEnvelope(ssp, String.valueOf(offset.incrementAndGet()), null, object)).collect(Collectors.toList()); envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp)); envelopeMap.put(ssp, envelopes); }); http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java index 8890a2f..c735c74 100644 --- a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java +++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java @@ -24,7 +24,9 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; + import org.apache.samza.Partition; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; @@ -50,13 +52,17 @@ public class SimpleSystemAdmin implements SystemAdmin { @Override public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { return streamNames.stream() - .collect(Collectors.toMap( - Function.<String>identity(), - streamName -> { + .collect(Collectors.toMap(Function.identity(), streamName -> { + int messageCount = isBootstrapStream(streamName) ? getMessageCount(streamName) : -1; + String oldestOffset = messageCount < 0 ? null : "0"; + String newestOffset = messageCount < 0 ? null : String.valueOf(messageCount - 1); + String upcomingOffset = messageCount < 0 ? null : String.valueOf(messageCount); Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>(); int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1); for (int i = 0; i < partitionCount; i++) { - metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)); + metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata( + oldestOffset, newestOffset, upcomingOffset + )); } return new SystemStreamMetadata(streamName, metadataMap); })); @@ -71,5 +77,17 @@ public class SimpleSystemAdmin implements SystemAdmin { } return offset1.compareTo(offset2); } + + private int getMessageCount(String streamName) { + try { + return Base64Serializer.deserialize(config.get("streams." + streamName + ".source"), Object[].class).length; + } catch (Exception e) { + throw new SamzaException(e); + } + } + + private boolean isBootstrapStream(String streamName) { + return "true".equalsIgnoreCase(config.get("streams." + streamName + ".samza.bootstrap", "false")); + } }
