This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new bad9ee8 [SAMZA-2666] Updating tests to use TestRunner framework
(#1510)
bad9ee8 is described below
commit bad9ee81bc065f6dffd55829335434c5d6dfe010
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Aug 5 14:24:17 2021 -0700
[SAMZA-2666] Updating tests to use TestRunner framework (#1510)
---
.../EndOfStreamIntegrationTest.java | 94 +++++-------------
.../samza/test/operator/RepartitionWindowApp.java | 65 ------------
.../samza/test/operator/TestAsyncFlatMap.java | 94 +++++++++++++-----
.../test/operator/TestRepartitionWindowApp.java | 110 ++++++++++++---------
.../table/TestCouchbaseRemoteTableEndToEnd.java | 56 +++++------
.../TestLocalTableWithConfigRewriterEndToEnd.java | 37 ++++---
.../TestLocalTableWithLowLevelApiEndToEnd.java | 37 +++----
.../samza/test/table/TestRemoteTableEndToEnd.java | 64 ++++++------
.../table/TestRemoteTableWithBatchEndToEnd.java | 64 +++++-------
.../org/apache/samza/test/table/TestTableData.java | 4 +-
10 files changed, 269 insertions(+), 356 deletions(-)
diff --git
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index e53f701..e5b8673 100644
---
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -19,33 +19,22 @@
package org.apache.samza.test.controlmessages;
+import java.time.Duration;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.test.controlmessages.TestData.PageView;
-import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
+import org.apache.samza.test.table.TestTableData;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -54,49 +43,12 @@ import static org.junit.Assert.assertEquals;
* This test uses an array as a bounded input source, and does a partitionBy()
and sink() after reading the input.
* It verifies the pipeline will stop and the number of output messages should
equal to the input.
*/
-public class EndOfStreamIntegrationTest extends IntegrationTestHarness {
-
- private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk",
"group", "job"};
-
- private static List<PageView> received = new ArrayList<>();
+public class EndOfStreamIntegrationTest {
+ private static final List<PageView> RECEIVED = new ArrayList<>();
@Test
- public void testPipeline() throws Exception {
- Random random = new Random();
- int count = 10;
- PageView[] pageviews = new PageView[count];
- for (int i = 0; i < count; i++) {
- String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
- int memberId = random.nextInt(10);
- pageviews[i] = new PageView(pagekey, memberId);
- }
-
- int partitionCount = 4;
- Map<String, String> configs = new HashMap<>();
- configs.put(ApplicationConfig.APP_RUNNER_CLASS,
"org.apache.samza.runtime.LocalApplicationRunner");
- configs.put("systems.test.samza.factory",
ArraySystemFactory.class.getName());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source",
Base64Serializer.serialize(pageviews));
- configs.put("streams.PageView.partitionCount",
String.valueOf(partitionCount));
-
- configs.put(JobConfig.JOB_NAME, "test-eos-job");
- configs.put(JobConfig.PROCESSOR_ID, "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
- configs.put(TaskConfig.GROUPER_FACTORY,
SingleContainerGrouperFactory.class.getName());
-
- configs.put("systems.kafka.samza.factory",
"org.apache.samza.system.kafka.KafkaSystemFactory");
- configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
- configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
- configs.put("systems.kafka.samza.key.serde", "int");
- configs.put("systems.kafka.samza.msg.serde", "json");
- configs.put("systems.kafka.default.stream.replication.factor", "1");
- configs.put("job.default.system", "kafka");
-
- configs.put("serializers.registry.int.class",
"org.apache.samza.serializers.IntegerSerdeFactory");
- configs.put("serializers.registry.json.class",
PageViewJsonSerdeFactory.class.getName());
-
+ public void testPipeline() {
class PipelineApplication implements StreamApplication {
-
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
@@ -104,20 +56,22 @@ public class EndOfStreamIntegrationTest extends
IntegrationTestHarness {
sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(),
new NoOpSerde<>()));
appDescriptor.getInputStream(isd)
.map(KV::getValue)
- .partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new
NoOpSerde<>(), new NoOpSerde<>()), "p1")
+ .partitionBy(PageView::getMemberId, pv -> pv,
+ KVSerde.of(new IntegerSerde(), new
TestTableData.PageViewJsonSerde()), "p1")
.sink((m, collector, coordinator) -> {
- received.add(m.getValue());
+ RECEIVED.add(m.getValue());
});
}
}
- Config config = new MapConfig(configs);
- final ApplicationRunner runner =
ApplicationRunners.getApplicationRunner(new PipelineApplication(),
- config);
-
- executeRun(runner, config);
- runner.waitForFinish();
+ int numPageViews = 40;
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+ InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+ .getInputDescriptor("PageView", new NoOpSerde<>());
+ TestRunner.of(new PipelineApplication())
+ .addInputStream(inputDescriptor,
TestTableData.generatePartitionedPageViews(numPageViews, 4))
+ .run(Duration.ofSeconds(10));
- assertEquals(received.size(), count * partitionCount);
+ assertEquals(RECEIVED.size(), numPageViews);
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
deleted file mode 100644
index fe8e318..0000000
---
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.operator;
-
-import java.time.Duration;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
-import org.apache.samza.test.operator.data.PageView;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link StreamApplication} that demonstrates a repartition followed by a
windowed count.
- */
-public class RepartitionWindowApp implements StreamApplication {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RepartitionWindowApp.class);
- static final String SYSTEM = "kafka";
- static final String INPUT_TOPIC = "page-views";
- static final String OUTPUT_TOPIC = "Result";
-
-
- @Override
- public void describe(StreamApplicationDescriptor appDescriptor) {
- KVSerde<String, PageView> inputSerde = KVSerde.of(new
StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
- KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new
StringSerde());
- KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
- KafkaInputDescriptor<KV<String, PageView>> id =
ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
- KafkaOutputDescriptor<KV<String, String>> od =
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
-
- appDescriptor.getInputStream(id)
- .map(KV::getValue)
- .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
- .window(Windows.keyedSessionWindow(m -> m.getKey(),
Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new
IntegerSerde()), "w1")
- .map(wp -> KV.of(wp.getKey().getKey().toString(),
String.valueOf(wp.getMessage())))
- .sendTo(appDescriptor.getOutputStream(od));
-
- }
-}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
index a60aea7..d5a5e7e 100644
---
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
+++
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
@@ -18,7 +18,6 @@
*/
package org.apache.samza.test.operator;
-import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
@@ -31,12 +30,12 @@ import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.NoOpSerde;
@@ -46,14 +45,15 @@ import org.apache.samza.test.framework.TestRunner;
import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import
org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.IntegrationTestHarness;
import org.apache.samza.test.operator.data.PageView;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
-public class TestAsyncFlatMap extends IntegrationTestHarness {
+public class TestAsyncFlatMap {
private static final String TEST_SYSTEM = "test";
private static final String PAGE_VIEW_STREAM = "test-async-page-view-stream";
private static final String NON_GUEST_PAGE_VIEW_STREAM =
"test-async-non-guest-page-view-stream";
@@ -68,66 +68,97 @@ public class TestAsyncFlatMap extends
IntegrationTestHarness {
new PageView("3", "profile-page", "0"),
new PageView("4", LOGIN_PAGE, "0"));
-
@Test
public void testProcessingFutureCompletesSuccessfully() {
List<PageView> expectedPageViews = PAGE_VIEWS.stream()
- .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) &&
Long.valueOf(pageView.getUserId()) > 0)
+ .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) &&
Long.parseLong(pageView.getUserId()) > 0)
.collect(Collectors.toList());
- List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+ List<PageView> actualPageViews = runTest(new HashMap<>());
assertEquals("Mismatch between expected vs actual page views",
expectedPageViews, actualPageViews);
}
- @Test(expected = SamzaException.class)
+ @Test
public void testProcessingFutureCompletesAfterTaskTimeout() {
Map<String, String> configs = new HashMap<>();
configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
configs.put(PROCESS_JITTER, "200");
- runTest(PAGE_VIEWS, configs);
+ try {
+ runTest(configs);
+ fail("App execution should have failed due to a task callback timeout");
+ } catch (SamzaException e) {
+ /*
+ * TestRunner throws SamzaException on failures in general, so check the
actual cause. The timeout message is
+ * nested within a bunch of other exceptions.
+ */
+ Throwable rootCause = findRootCause(e);
+ assertTrue(rootCause instanceof SamzaException);
+ // the "{}" is intentional, since the exception message actually
includes it (probably a logging bug)
+ assertEquals("Callback for task {} Partition 0 timed out after 100 ms.",
rootCause.getMessage());
+ }
}
- @Test(expected = RuntimeException.class)
+ @Test
public void testProcessingExceptionIsBubbledUp() {
Map<String, String> configs = new HashMap<>();
configs.put(FAIL_PROCESS, "true");
- runTest(PAGE_VIEWS, configs);
+ try {
+ runTest(configs);
+ fail("App execution should have failed due to a
ProcessFailureException");
+ } catch (SamzaException e) {
+ /*
+ * TestRunner throws SamzaException on failures in general, so check the
actual cause. The actual exception is
+ * nested within a bunch of other exceptions.
+ */
+ assertTrue(findRootCause(e) instanceof ProcessFailureException);
+ }
}
- @Test(expected = RuntimeException.class)
+ @Test
public void testDownstreamOperatorExceptionIsBubbledUp() {
Map<String, String> configs = new HashMap<>();
configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
- runTest(PAGE_VIEWS, configs);
+ try {
+ runTest(configs);
+ fail("App execution should have failed due to a FilterFailureException");
+ } catch (SamzaException e) {
+ /*
+ * TestRunner throws SamzaException on failures in general, so check the
actual cause. The actual exception is
+ * nested within a bunch of other exceptions.
+ */
+ assertTrue(findRootCause(e) instanceof FilterFailureException);
+ }
}
- private List<PageView> runTest(List<PageView> pageViews, Map<String, String>
configs) {
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID,
PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+ private List<PageView> runTest(Map<String, String> configs) {
InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
.getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
.getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
TestRunner
.of(new AsyncFlatMapExample())
- .addInputStream(pageViewStreamDesc, pageViews)
+ .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
.addOutputStream(outputStreamDesc, 1)
.addConfig(new MapConfig(configs))
- .run(Duration.ofMillis(50000));
+ .run(Duration.ofSeconds(10));
Map<Integer, List<PageView>> result =
TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
- List<PageView> results = result.values().stream()
+ return result.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
+ }
- return results;
+ private static Throwable findRootCause(Throwable e) {
+ Throwable currentException = e;
+ while (currentException.getCause() != null) {
+ currentException = currentException.getCause();
+ }
+ return currentException;
}
static class AsyncFlatMapExample implements StreamApplication {
@@ -158,11 +189,11 @@ public class TestAsyncFlatMap extends
IntegrationTestHarness {
System.out.println("Interrupted during sleep.");
}
- return Long.valueOf(pageView.getUserId()) < 1 ?
Collections.emptyList() : Collections.singleton(pageView);
+ return Long.parseLong(pageView.getUserId()) < 1 ?
Collections.emptyList() : Collections.singleton(pageView);
});
if (shouldFailProcess.test(pageView)) {
- filteredPageViews.completeExceptionally(new RuntimeException("Remote
service threw an exception"));
+ filteredPageViews.completeExceptionally(new
ProcessFailureException("Remote service threw an exception"));
}
return filteredPageViews;
@@ -170,11 +201,22 @@ public class TestAsyncFlatMap extends
IntegrationTestHarness {
private static boolean filterLoginPageViews(PageView pageView,
Predicate<PageView> shouldFailProcess) {
if (shouldFailProcess.test(pageView)) {
- throw new RuntimeException("Filtering login page views ran into an
exception");
+ throw new FilterFailureException("Filtering login page views ran into
an exception");
}
return !LOGIN_PAGE.equals(pageView.getPageId());
}
+ }
+ private static class ProcessFailureException extends RuntimeException {
+ public ProcessFailureException(String s) {
+ super(s);
+ }
+ }
+
+ private static class FilterFailureException extends RuntimeException {
+ public FilterFailureException(String s) {
+ super(s);
+ }
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 77ada4a..7add331 100644
---
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -18,69 +18,87 @@
*/
package org.apache.samza.test.operator;
-import java.util.Collections;
+import java.time.Duration;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
+import com.google.common.collect.ImmutableList;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
+import org.apache.samza.test.framework.StreamAssert;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.operator.data.PageView;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
import org.junit.Test;
-import static org.apache.samza.test.operator.RepartitionWindowApp.*;
/**
* Test driver for {@link RepartitionWindowApp}.
*/
-public class TestRepartitionWindowApp extends
StreamApplicationIntegrationTestHarness {
- private static final String APP_NAME = "PageViewCounterApp";
+public class TestRepartitionWindowApp {
+ private static final String SYSTEM = "test";
+ private static final String INPUT_TOPIC = "page-views";
+ private static final String OUTPUT_TOPIC = "Result";
@Test
public void testRepartitionedSessionWindowCounter() throws Exception {
- // create topics
- createTopic(INPUT_TOPIC, 3);
- createTopic(OUTPUT_TOPIC, 1);
+ Map<Integer, List<KV<String, PageView>>> pageViews = new HashMap<>();
+ pageViews.put(0, ImmutableList.of(KV.of("userId1", new PageView("india",
"5.com", "userId1")),
+ KV.of("userId1", new PageView("india", "2.com", "userId1"))));
+ pageViews.put(1, ImmutableList.of(KV.of("userId2", new PageView("china",
"4.com", "userId2")),
+ KV.of("userId1", new PageView("india", "3.com", "userId1"))));
+ pageViews.put(2, ImmutableList.of(KV.of("userId1", new PageView("india",
"1.com", "userId1"))));
- // produce messages to different partitions.
- ObjectMapper mapper = new ObjectMapper();
- PageView pv = new PageView("india", "5.com", "userId1");
- produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
- pv = new PageView("china", "4.com", "userId2");
- produceMessage(INPUT_TOPIC, 1, "userId2", mapper.writeValueAsString(pv));
- pv = new PageView("india", "1.com", "userId1");
- produceMessage(INPUT_TOPIC, 2, "userId1", mapper.writeValueAsString(pv));
- pv = new PageView("india", "2.com", "userId1");
- produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
- pv = new PageView("india", "3.com", "userId1");
- produceMessage(INPUT_TOPIC, 1, "userId1", mapper.writeValueAsString(pv));
+ InMemorySystemDescriptor sd = new InMemorySystemDescriptor(SYSTEM);
+ InMemoryInputDescriptor<KV<String, PageView>> inputDescriptor =
+ sd.getInputDescriptor(INPUT_TOPIC, KVSerde.of(new NoOpSerde<>(), new
NoOpSerde<>()));
+ /*
+ * Technically, this should have a message type of KV, because a KV is
passed to sendTo, but
+ * StreamAssert.containsInAnyOrder requires the type to match the output
type of the actual messages. In
+ * high-level, sendTo splits up the KV, so the actual messages are just
the "V" part of the KV.
+ * TestRunner only uses NoOpSerde anyways, so it doesn't matter if the
typing isn't KV.
+ */
+ InMemoryOutputDescriptor<String> outputDescriptor =
sd.getOutputDescriptor(OUTPUT_TOPIC, new NoOpSerde<>());
+ TestRunner.of(new RepartitionWindowApp())
+ .addInputStream(inputDescriptor, pageViews)
+ .addOutputStream(outputDescriptor, 1)
+ .addConfig("task.window.ms", "1000")
+ .run(Duration.ofSeconds(10));
- Map<String, String> configs = new HashMap<>();
- configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- configs.put(JobConfig.PROCESSOR_ID, "0");
- configs.put(TaskConfig.GROUPER_FACTORY,
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-
- // run the application
- runApplication(new RepartitionWindowApp(), APP_NAME, configs);
+ StreamAssert.containsInAnyOrder(Arrays.asList("userId1 4", "userId2 1"),
outputDescriptor,
+ Duration.ofSeconds(1));
+ }
- // consume and validate result
- List<ConsumerRecord<String, String>> messages =
consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
- Assert.assertEquals(messages.size(), 2);
+ /**
+ * A {@link StreamApplication} that demonstrates a repartition followed by a
windowed count.
+ */
+ private static class RepartitionWindowApp implements StreamApplication {
+ @Override
+ public void describe(StreamApplicationDescriptor appDescriptor) {
+ DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM);
+ GenericInputDescriptor<KV<String, PageView>> inputDescriptor =
+ sd.getInputDescriptor(INPUT_TOPIC, KVSerde.of(new NoOpSerde<>(), new
NoOpSerde<>()));
+ GenericOutputDescriptor<KV<String, String>> outputDescriptor =
+ sd.getOutputDescriptor(OUTPUT_TOPIC, KVSerde.of(new NoOpSerde<>(),
new NoOpSerde<>()));
- for (ConsumerRecord<String, String> message : messages) {
- String key = message.key();
- String value = message.value();
- // Assert that there are 4 messages for userId1 and 1 message for
userId2.
- Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
- if ("userId1".equals(key)) {
- Assert.assertEquals(value, "4");
- } else {
- Assert.assertEquals(value, "1");
- }
+ appDescriptor.getInputStream(inputDescriptor)
+ .map(KV::getValue)
+ .partitionBy(PageView::getUserId, m -> m, KVSerde.of(new
NoOpSerde<>(), new NoOpSerde<>()), "p1")
+ .window(Windows.keyedSessionWindow(KV::getKey,
Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new
IntegerSerde()), "w1")
+ .map(wp -> KV.of(wp.getKey().getKey(), wp.getKey().getKey() + " " +
wp.getMessage()))
+ .sendTo(appDescriptor.getOutputStream(outputDescriptor));
}
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
index 834c356..1d52bdc 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
@@ -29,16 +29,14 @@ import
com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.mock.BucketConfiguration;
import com.couchbase.mock.CouchbaseMock;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -48,8 +46,9 @@ import
org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.remote.NoOpTableReadFunction;
import org.apache.samza.table.remote.couchbase.CouchbaseTableReadFunction;
import org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.junit.After;
import org.junit.Assert;
@@ -64,14 +63,14 @@ import org.junit.Test;
* CouchbaseMock library, closing a mocked bucket would throw exceptions like:
java.lang.ArithmeticException: / by zero.
* Please ignore those exception.
*/
-public class TestCouchbaseRemoteTableEndToEnd extends IntegrationTestHarness {
- protected CouchbaseEnvironment couchbaseEnvironment;
- protected CouchbaseMock couchbaseMock;
- protected Cluster cluster;
- protected String inputBucketName = "inputBucket";
- protected String outputBucketName = "outputBucket";
-
- protected void createMockBuckets(List<String> bucketNames) throws Exception {
+public class TestCouchbaseRemoteTableEndToEnd {
+ private CouchbaseEnvironment couchbaseEnvironment;
+ private CouchbaseMock couchbaseMock;
+ private Cluster cluster;
+ private String inputBucketName = "inputBucket";
+ private String outputBucketName = "outputBucket";
+
+ private void createMockBuckets(List<String> bucketNames) throws Exception {
ArrayList<BucketConfiguration> configList = new ArrayList<>();
bucketNames.forEach(name -> configList.add(configBucket(name)));
couchbaseMock = new CouchbaseMock(0, configList);
@@ -79,7 +78,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends
IntegrationTestHarness {
couchbaseMock.waitForStartup();
}
- protected BucketConfiguration configBucket(String bucketName) {
+ private BucketConfiguration configBucket(String bucketName) {
BucketConfiguration config = new BucketConfiguration();
config.numNodes = 1;
config.numReplicas = 1;
@@ -87,7 +86,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends
IntegrationTestHarness {
return config;
}
- protected void initClient() {
+ private void initClient() {
couchbaseEnvironment = DefaultCouchbaseEnvironment.builder()
.bootstrapCarrierDirectPort(couchbaseMock.getCarrierPort("inputBucket"))
.bootstrapHttpDirectPort(couchbaseMock.getHttpPort())
@@ -110,8 +109,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends
IntegrationTestHarness {
}
@Test
- public void testEndToEnd() throws Exception {
-
+ public void testEndToEnd() {
Bucket inputBucket = cluster.openBucket(inputBucketName);
inputBucket.upsert(ByteArrayDocument.create("Alice", "20".getBytes()));
inputBucket.upsert(ByteArrayDocument.create("Bob", "30".getBytes()));
@@ -119,15 +117,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends
IntegrationTestHarness {
inputBucket.upsert(ByteArrayDocument.create("David", "50".getBytes()));
inputBucket.close();
- String[] users = new String[]{"Alice", "Bob", "Chris", "David"};
-
- int partitionCount = 1;
- Map<String, String> configs =
TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.User.samza.system", "test");
- configs.put("streams.User.source", Base64Serializer.serialize(users));
- configs.put("streams.User.partitionCount", String.valueOf(partitionCount));
- Config config = new MapConfig(configs);
+ List<String> users = Arrays.asList("Alice", "Bob", "Chris", "David");
final StreamApplication app = appDesc -> {
DelegatingSystemDescriptor inputSystemDescriptor = new
DelegatingSystemDescriptor("test");
@@ -162,9 +152,12 @@ public class TestCouchbaseRemoteTableEndToEnd extends
IntegrationTestHarness {
.sendTo(outputTable);
};
- final LocalApplicationRunner runner = new LocalApplicationRunner(app,
config);
- executeRun(runner, config);
- runner.waitForFinish();
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+ InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+ .getInputDescriptor("User", new NoOpSerde<>());
+ TestRunner.of(app)
+ .addInputStream(inputDescriptor, users)
+ .run(Duration.ofSeconds(10));
Bucket outputBucket = cluster.openBucket(outputBucketName);
Assert.assertEquals("{\"name\":\"Alice\",\"age\":\"20\"}",
outputBucket.get("Alice").content().toString());
@@ -192,5 +185,4 @@ public class TestCouchbaseRemoteTableEndToEnd extends
IntegrationTestHarness {
return record.getKey();
}
}
-
-}
+}
\ No newline at end of file
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
index c84945d..6c1f38c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.table;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -28,7 +29,6 @@ import
org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigRewriter;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -39,31 +39,30 @@ import
org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.table.TableConfigGenerator;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.junit.Test;
-import static
org.apache.samza.test.table.TestLocalTableEndToEnd.getBaseJobConfig;
import static
org.apache.samza.test.table.TestLocalTableWithLowLevelApiEndToEnd.MyStreamTask;
-public class TestLocalTableWithConfigRewriterEndToEnd extends
IntegrationTestHarness {
-
+public class TestLocalTableWithConfigRewriterEndToEnd {
+ /**
+ * MyTaskApplication does not include table descriptor, so if the rewriter
does not add the table configs properly,
+ * then the application will fail to execute.
+ */
@Test
- public void testWithConfigRewriter() throws Exception {
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(),
zkConnect());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source",
Base64Serializer.serialize(TestTableData.generatePageViews(10)));
- configs.put("streams.PageView.partitionCount", String.valueOf(4));
- configs.put("task.inputs", "test.PageView");
- configs.put("job.config.rewriter.my-rewriter.class",
MyConfigRewriter.class.getName());
- configs.put("job.config.rewriters", "my-rewriter");
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new
MyTaskApplication(), config);
- executeRun(runner, config);
- runner.waitForFinish();
+ public void testWithConfigRewriter() {
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+ InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+ .getInputDescriptor("PageView", new NoOpSerde<>());
+ TestRunner.of(new MyTaskApplication())
+ .addInputStream(inputDescriptor,
TestTableData.generatePartitionedPageViews(40, 4))
+ .addConfig("job.config.rewriters", "my-rewriter")
+ .addConfig("job.config.rewriter.my-rewriter.class",
MyConfigRewriter.class.getName())
+ .run(Duration.ofSeconds(10));
}
static public class MyConfigRewriter implements ConfigRewriter {
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
index 164e74b..734b0ba 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
@@ -19,14 +19,10 @@
package org.apache.samza.test.table;
-import java.util.Map;
-
+import java.time.Duration;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.context.Context;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -40,29 +36,24 @@ import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
-
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.junit.Assert;
import org.junit.Test;
-import static
org.apache.samza.test.table.TestLocalTableEndToEnd.getBaseJobConfig;
-
-
-public class TestLocalTableWithLowLevelApiEndToEnd extends
IntegrationTestHarness {
+public class TestLocalTableWithLowLevelApiEndToEnd {
@Test
- public void testTableWithLowLevelApi() throws Exception {
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(),
zkConnect());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source",
Base64Serializer.serialize(TestTableData.generatePageViews(10)));
- configs.put("streams.PageView.partitionCount", String.valueOf(4));
- configs.put("task.inputs", "test.PageView");
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new
MyTaskApplication(), config);
- executeRun(runner, config);
- runner.waitForFinish();
+ public void testTableWithLowLevelApi() {
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+ InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+ .getInputDescriptor("PageView", new NoOpSerde<>());
+ TestRunner.of(new
TestLocalTableWithConfigRewriterEndToEnd.MyTaskApplication())
+ .addInputStream(inputDescriptor,
TestTableData.generatePartitionedPageViews(40, 4))
+ .addConfig("job.config.rewriters", "my-rewriter")
+ .addConfig("job.config.rewriter.my-rewriter.class",
TestLocalTableWithConfigRewriterEndToEnd.MyConfigRewriter.class.getName())
+ .run(Duration.ofSeconds(10));
}
static public class MyTaskApplication implements TaskApplication {
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
index 2b89ba9..6f17a10 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
@@ -39,7 +39,6 @@ import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
@@ -53,7 +52,6 @@ import org.apache.samza.operators.KV;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
@@ -65,7 +63,9 @@ import
org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.util.Base64Serializer;
import org.apache.samza.util.RateLimiter;
@@ -75,7 +75,6 @@ import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.generatePageViews;
import static org.apache.samza.test.table.TestTableData.generateProfiles;
import static org.mockito.Matchers.any;
@@ -83,10 +82,9 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
-public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
-
- static Map<String, AtomicInteger> counters = new HashMap<>();
- static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
+public class TestRemoteTableEndToEnd {
+ private static final Map<String, AtomicInteger> COUNTERS = new HashMap<>();
+ private static final Map<String, List<EnrichedPageView>> WRITTEN_RECORDS =
new HashMap<>();
static class InMemoryProfileReadFunction extends BaseTableFunction
implements TableReadFunction<Integer, Profile> {
@@ -144,7 +142,7 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
in.defaultReadObject();
// Write to the global list for verification
- records = writtenRecords.get(testName);
+ records = WRITTEN_RECORDS.get(testName);
}
@Override
@@ -189,7 +187,7 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
public CompletableFuture readAsync(int opId, Object... args) {
if (1 == opId) {
boolean shouldReturnValue = (boolean) args[0];
- AtomicInteger counter = counters.get(testName);
+ AtomicInteger counter = COUNTERS.get(testName);
Integer result = shouldReturnValue ? counter.get() : null;
return CompletableFuture.completedFuture(result);
} else {
@@ -230,7 +228,7 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
@Override
public CompletableFuture writeAsync(int opId, Object... args) {
Integer result;
- AtomicInteger counter = counters.get(testName);
+ AtomicInteger counter = COUNTERS.get(testName);
boolean shouldModify = (boolean) args[0];
switch (opId) {
case 1:
@@ -251,7 +249,7 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
}
}
- static private class TestReadWriteMapFunction implements
MapFunction<PageView, PageView> {
+ private static class TestReadWriteMapFunction implements
MapFunction<PageView, PageView> {
private final String counterTableName;
private ReadWriteTable counterTable;
@@ -323,20 +321,12 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
return appDesc.getTable(cachingDesc);
}
- private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean
defaultCache, boolean withArgs, String testName)
- throws Exception {
-
- writtenRecords.put(testName, new ArrayList<>());
-
- int count = 10;
- final PageView[] pageViews = generatePageViews(count);
- final String profiles =
Base64Serializer.serialize(generateProfiles(count));
+ private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean
defaultCache, boolean withArgs,
+ String testName) throws Exception {
+ WRITTEN_RECORDS.put(testName, new ArrayList<>());
- final int partitionCount = 4;
- final Map<String, String> configs =
TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source",
Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount",
String.valueOf(partitionCount));
+ // max member id for page views is 10
+ final String profiles = Base64Serializer.serialize(generateProfiles(10));
final RateLimiter readRateLimiter = mock(RateLimiter.class,
withSettings().serializable());
final TableRateLimiter.CreditFunction creditFunction = (k, v, args) -> 1;
@@ -373,7 +363,7 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
.sendTo(outputTable);
} else {
- counters.put(testName, new AtomicInteger());
+ COUNTERS.put(testName, new AtomicInteger());
final RemoteTableDescriptor counterTableDesc =
new RemoteTableDescriptor("counter-table-1")
@@ -395,19 +385,21 @@ public class TestRemoteTableEndToEnd extends
IntegrationTestHarness {
}
};
- final Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app,
config);
- executeRun(runner, config);
- runner.waitForFinish();
+ int numPageViews = 40;
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+ InMemoryInputDescriptor<PageView> inputDescriptor = isd
+ .getInputDescriptor("PageView", new NoOpSerde<>());
+ TestRunner.of(app)
+ .addInputStream(inputDescriptor,
TestTableData.generatePartitionedPageViews(numPageViews, 4))
+ .run(Duration.ofSeconds(10));
- final int numExpected = count * partitionCount;
- Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
- Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof
EnrichedPageView);
+ Assert.assertEquals(numPageViews, WRITTEN_RECORDS.get(testName).size());
+ Assert.assertNotNull(WRITTEN_RECORDS.get(testName).get(0));
if (!withArgs) {
- writtenRecords.get(testName).forEach(epv ->
Assert.assertFalse(epv.company.contains("-")));
+ WRITTEN_RECORDS.get(testName).forEach(epv ->
Assert.assertFalse(epv.company.contains("-")));
} else {
- writtenRecords.get(testName).forEach(epv ->
Assert.assertTrue(epv.company.endsWith("-r-w")));
- Assert.assertEquals(numExpected, counters.get(testName).get());
+ WRITTEN_RECORDS.get(testName).forEach(epv ->
Assert.assertTrue(epv.company.endsWith("-r-w")));
+ Assert.assertEquals(numPageViews, COUNTERS.get(testName).get());
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
index 3cad14e..e37bd86 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
@@ -33,10 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -48,7 +45,9 @@ import org.apache.samza.table.remote.BaseTableFunction;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.util.Base64Serializer;
import org.apache.samza.util.RateLimiter;
import org.junit.Assert;
@@ -58,11 +57,10 @@ import static org.apache.samza.test.table.TestTableData.*;
import static org.mockito.Mockito.*;
-public class TestRemoteTableWithBatchEndToEnd extends IntegrationTestHarness {
-
- static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
- static Map<String, AtomicInteger> batchWrites = new HashMap<>();
- static Map<String, AtomicInteger> batchReads = new HashMap<>();
+public class TestRemoteTableWithBatchEndToEnd {
+ private static final Map<String, List<EnrichedPageView>> WRITTEN_RECORDS =
new HashMap<>();
+ private static final Map<String, AtomicInteger> BATCH_WRITES = new
HashMap<>();
+ private static final Map<String, AtomicInteger> BATCH_READS = new
HashMap<>();
static class InMemoryReadFunction extends BaseTableFunction
implements TableReadFunction<Integer, Profile> {
@@ -80,7 +78,7 @@ public class TestRemoteTableWithBatchEndToEnd extends
IntegrationTestHarness {
in.defaultReadObject();
Profile[] profiles =
Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p ->
p.getMemberId(), Function.identity()));
- batchReadCounter = batchReads.get(testName);
+ batchReadCounter = BATCH_READS.get(testName);
}
@Override
@@ -119,8 +117,8 @@ public class TestRemoteTableWithBatchEndToEnd extends
IntegrationTestHarness {
in.defaultReadObject();
// Write to the global list for verification
- records = writtenRecords.get(testName);
- batchWritesCounter = batchWrites.get(testName);
+ records = WRITTEN_RECORDS.get(testName);
+ batchWritesCounter = BATCH_WRITES.get(testName);
}
@Override
@@ -147,7 +145,6 @@ public class TestRemoteTableWithBatchEndToEnd extends
IntegrationTestHarness {
}
}
-
static class MyReadFunction extends BaseTableFunction
implements TableReadFunction {
@Override
@@ -164,24 +161,14 @@ public class TestRemoteTableWithBatchEndToEnd extends
IntegrationTestHarness {
private void doTestStreamTableJoinRemoteTable(String testName, boolean
batchRead, boolean batchWrite) throws Exception {
final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
- batchReads.put(testName, new AtomicInteger());
- batchWrites.put(testName, new AtomicInteger());
- writtenRecords.put(testName, new CopyOnWriteArrayList<>());
+ BATCH_READS.put(testName, new AtomicInteger());
+ BATCH_WRITES.put(testName, new AtomicInteger());
+ WRITTEN_RECORDS.put(testName, new CopyOnWriteArrayList<>());
- final int count = 16;
- final int batchSize = 4;
- PageView[] pageViews = generatePageViewsWithDistinctKeys(count);
+ int count = 16;
+ int batchSize = 4;
String profiles = Base64Serializer.serialize(generateProfiles(count));
- int partitionCount = 1;
- Map<String, String> configs =
TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source",
Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount",
String.valueOf(partitionCount));
- configs.put("task.max.concurrency", String.valueOf(count));
- configs.put("task.async.commit", String.valueOf(true));
-
final RateLimiter readRateLimiter = mock(RateLimiter.class,
withSettings().serializable());
final RateLimiter writeRateLimiter = mock(RateLimiter.class,
withSettings().serializable());
final TableRateLimiter.CreditFunction creditFunction = (k, v, args)->1;
@@ -219,20 +206,23 @@ public class TestRemoteTableWithBatchEndToEnd extends
IntegrationTestHarness {
.sendTo(outputTable);
};
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app,
config);
- executeRun(runner, config);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+ InMemoryInputDescriptor<PageView> inputDescriptor = isd
+ .getInputDescriptor("PageView", new NoOpSerde<>());
+ TestRunner.of(app)
+ .addInputStream(inputDescriptor,
Arrays.asList(generatePageViewsWithDistinctKeys(count)))
+ .addConfig("task.max.concurrency", String.valueOf(count))
+ .addConfig("task.async.commit", String.valueOf(true))
+ .run(Duration.ofSeconds(10));
- runner.waitForFinish();
- int numExpected = count * partitionCount;
- Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
- Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof
EnrichedPageView);
+ Assert.assertEquals(count, WRITTEN_RECORDS.get(testName).size());
+ Assert.assertNotNull(WRITTEN_RECORDS.get(testName).get(0));
if (batchRead) {
- Assert.assertEquals(numExpected / batchSize,
batchReads.get(testName).get());
+ Assert.assertEquals(count / batchSize, BATCH_READS.get(testName).get());
}
if (batchWrite) {
- Assert.assertEquals(numExpected / batchSize,
batchWrites.get(testName).get());
+ Assert.assertEquals(count / batchSize, BATCH_WRITES.get(testName).get());
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index 39f9b02..f7d74a6 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -244,7 +244,7 @@ public class TestTableData {
return pageViews;
}
- static public PageView[] generatePageViewsWithDistinctKeys(int count) {
+ public static PageView[] generatePageViewsWithDistinctKeys(int count) {
Random random = new Random();
PageView[] pageviews = new PageView[count];
for (int i = 0; i < count; i++) {
@@ -256,7 +256,7 @@ public class TestTableData {
private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB",
"AMZN", "CSCO"};
- static public Profile[] generateProfiles(int count) {
+ public static Profile[] generateProfiles(int count) {
Random random = new Random();
Profile[] profiles = new Profile[count];
for (int i = 0; i < count; i++) {