Repository: samza Updated Branches: refs/heads/master a4b831d3d -> 01ee053ed
Revert "SAMZA-704 : Create a tool to write coordinator stream" This reverts commit a4b831d3d3a555bc3cca2b0819813c6fad8bd480. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/01ee053e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/01ee053e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/01ee053e Branch: refs/heads/master Commit: 01ee053ed720832a1ac679525f1aa2b29fe0463e Parents: a4b831d Author: Navina <[email protected]> Authored: Thu Jul 16 15:07:54 2015 -0700 Committer: Navina <[email protected]> Committed: Thu Jul 16 15:07:54 2015 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 - checkstyle/import-control.xml.orig | 183 ------------------- .../stream/CoordinatorStreamMessage.java | 8 +- .../stream/CoordinatorStreamWriter.java | 128 ------------- .../CoordinatorStreamWriterCommandLine.scala | 71 ------- .../MockCoordinatorStreamSystemFactory.java | 90 +++------ .../TestCoordinatorStreamSystemProducer.java | 59 +++++- .../stream/TestCoordinatorStreamWriter.java | 166 ----------------- .../main/bash/run-coordinator-stream-writer.sh | 21 --- 9 files changed, 74 insertions(+), 653 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 6654319..eef3370 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -123,7 +123,6 @@ <allow class="org.apache.samza.Partition" /> <allow class="org.apache.samza.SamzaException" /> - <allow class="joptsimple.OptionSet" /> </subpackage> <subpackage name="checkpoint"> http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/checkstyle/import-control.xml.orig ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml.orig b/checkstyle/import-control.xml.orig deleted file mode 100644 index eef3370..0000000 --- a/checkstyle/import-control.xml.orig +++ /dev/null @@ -1,183 +0,0 @@ -<!DOCTYPE import-control PUBLIC - "-//Puppy Crawl//DTD Import Control 1.1//EN" - "http://www.puppycrawl.com/dtds/import_control_1_1.dtd"> -<!-- -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. ---> -<import-control pkg="org.apache.samza"> - - <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> - - <!-- common library dependencies --> - <allow pkg="java" /> - <allow pkg="javax.management" /> - <allow pkg="org.slf4j" /> - <allow pkg="org.junit" /> - <allow pkg="org.codehaus" /> - <allow pkg="org.mockito" /> - <allow pkg="org.apache.log4j" /> - <allow pkg="org.apache.kafka" /> - - <subpackage name="config"> - <allow class="org.apache.samza.SamzaException" /> - </subpackage> - - <subpackage name="serializers"> - <allow pkg="org.apache.samza.config" /> - - <subpackage name="model"> - <allow pkg="org.apache.samza.job.model" /> - <allow pkg="org.apache.samza.util" /> - - <allow class="org.apache.samza.Partition" /> - <allow class="org.apache.samza.container.TaskName" /> - <allow class="org.apache.samza.system.SystemStreamPartition" /> - </subpackage> - </subpackage> - - <subpackage name="job"> - <allow pkg="org.apache.samza.config" /> - - <subpackage name="model"> - <allow class="org.apache.samza.Partition" /> - <allow class="org.apache.samza.container.TaskName" /> - <allow class="org.apache.samza.system.SystemStreamPartition" /> - <allow class="org.apache.samza.container.LocalityManager" /> - </subpackage> - </subpackage> - - <subpackage name="system"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.serializers" /> - - <allow class="org.apache.samza.Partition" /> - <allow class="org.apache.samza.SamzaException" /> - - <subpackage name="chooser"> - <allow class="org.apache.samza.system.SystemStreamPartition" /> - <allow class="org.apache.samza.system.IncomingMessageEnvelope" /> - </subpackage> - - <subpackage name="mock"> - <allow pkg="org.apache.samza.system" /> - <allow pkg="org.apache.samza.util" /> - </subpackage> - </subpackage> - - <subpackage name="util"> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.system" /> - - <allow class="org.apache.samza.Partition" /> - <allow class="org.apache.samza.SamzaException" /> - </subpackage> - - <subpackage name="metrics"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.util" /> - </subpackage> - - <subpackage name="task"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.container" /> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.system" /> - </subpackage> - - <subpackage name="container"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.coordinator.stream" /> - <subpackage name="grouper"> - <subpackage name="stream"> - <allow pkg="org.apache.samza.container" /> - <allow pkg="org.apache.samza.system" /> - </subpackage> - - <subpackage name="task"> - <allow pkg="org.apache.samza.job" /> - </subpackage> - </subpackage> - </subpackage> - - <subpackage name="coordinator"> - <allow pkg="org.apache.samza.checkpoint" /> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.system" /> - <allow pkg="org.apache.samza.serializers" /> - <allow pkg="org.apache.samza.util" /> - - <allow class="org.apache.samza.Partition" /> - <allow class="org.apache.samza.SamzaException" /> - </subpackage> - - <subpackage name="checkpoint"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.container" /> - <allow pkg="org.apache.samza.coordinator" /> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.system" /> - - <allow class="org.apache.samza.SamzaException" /> - </subpackage> - - <subpackage name="storage"> - <allow pkg="org.apache.samza.container" /> - <allow pkg="org.apache.samza.coordinator" /> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.serializers" /> - <allow pkg="org.apache.samza.system" /> - <allow pkg="org.apache.samza.task" /> - <allow pkg="org.apache.samza.util" /> - <allow pkg="org.apache.samza.job" /> - <allow pkg="org.apache.samza.config" /> - <allow pkg="joptsimple" /> - - <allow class="org.apache.samza.SamzaException" /> - <allow class="org.apache.samza.Partition" /> - </subpackage> - - <subpackage name="logging"> - <subpackage name="log4j"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.coordinator" /> - <allow pkg="org.apache.samza.job" /> - <allow pkg="org.apache.samza.metrics" /> - <allow pkg="org.apache.samza.system" /> - <allow pkg="org.apache.samza.serializers" /> - <allow pkg="org.apache.samza.util" /> - - <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" /> - <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" /> - <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory" /> - <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" /> - <allow class="org.apache.samza.SamzaException" /> - </subpackage> - </subpackage> - - <subpackage name="test"> - <subpackage name="integration"> - <allow pkg="org.apache.samza.config" /> - <allow pkg="org.apache.samza.container" /> - <allow pkg="org.apache.samza.system" /> - <allow pkg="org.apache.samza.storage" /> - <allow pkg="org.apache.samza.task" /> - <allow pkg="org.apache.samza.util" /> - </subpackage> - </subpackage> - -</import-control> http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java index e5ab4fb..6bd1bd3 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java @@ -211,7 +211,7 @@ public class CoordinatorStreamMessage { /** * The type of the message is used to convert a generic - * CoordinatorStreamMessage into a specific message, such as a SetConfig + * CoordinatorStreaMessage into a specific message, such as a SetConfig * message. * * @return The type of the message. @@ -235,14 +235,14 @@ public class CoordinatorStreamMessage { } /** - * @return The username of a message. + * @return Whether the message signifies a delete or not. */ public String getUsername() { return (String) this.messageMap.get("username"); } /** - * @return The timestamp of a message. + * @return Whether the message signifies a delete or not. */ public long getTimestamp() { return (Long) this.messageMap.get("timestamp"); @@ -254,7 +254,7 @@ public class CoordinatorStreamMessage { public Map<String, Object> getMessageMap() { if (!isDelete) { Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap); - // To make sure the values is immutable, we overwrite it with an immutable version of the the values map. + // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map. immutableMap.put("values", Collections.unmodifiableMap(getMessageValues())); return Collections.unmodifiableMap(immutableMap); } else { http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java deleted file mode 100644 index f769756..0000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator.stream; - -import joptsimple.OptionSet; -import org.apache.samza.config.Config; -import org.apache.samza.metrics.MetricsRegistryMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * This class writes control messages to the CoordinatorStream. - * To use this class it, first, it should be initialized by the start() method, - * and then use the sendMessage() function to send all the control messages needed. - * Finally, the stop() method should be called. - * The control messages are in the format of a (type, key, value) where: - * type: defines the kind of message of the control message from the set {set-config}. - * key: defines a key to associate with the value. This can be null as well for messages with no value - * value: defines the value being sent along with the message. This can be null as well for messages with no value. - */ -public class CoordinatorStreamWriter { - - private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class); - public final static String SOURCE = "coordinator-stream-writer"; - public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE; - - private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer; - - - public CoordinatorStreamWriter(Config config) { - coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap()); - } - - /** - * This method initializes the writer by starting a coordinator stream producer. - */ - public void start() { - coordinatorStreamSystemProducer.register(CoordinatorStreamWriter.SOURCE); - coordinatorStreamSystemProducer.start(); - log.info("Started coordinator stream writer."); - } - - /** - * This method stops the writer and closes the coordinator stream producer - */ - public void stop() { - log.info("Stopping the coordinator stream producer."); - coordinatorStreamSystemProducer.stop(); - } - - /** - * This method sends a message to the coordinator stream. This creates a message containing (type,key,value). - * For example if you want to set the number of yarn containers to 3, you would use - * ("set-config", "yarn.container.count", "3"). - * - * @param type defines the kind of message of the control message from the set {"set-config"}. - * @param key defines a key to associate with the value. This can be null for messages with no key or value. - * @param value defines the value being sent along with the message. This can be null for messages with no value. - */ - public void sendMessage(String type, String key, String value) { - //TODO: validate keys and values - if (type.equals(SET_CONFIG_TYPE)) { - sendSetConfigMessage(key, value); - } else { - throw new IllegalArgumentException("Type is invalid. The possible values are {" + SET_CONFIG_TYPE + "}"); - } - } - - /** - * This method sends message of type "set-config" to the coordinator stream - * - * @param key defines the name of the configuration being set. For example, for setting the number of yarn containers, - * the key is "yarn.container.count" - * @param value defines the value associated with the key. For example, if the key is "yarn.container.count" the value - * is the new number of containers. - */ - private void sendSetConfigMessage(String key, String value) { - log.info("sent SetConfig message with key = " + key + " and value = " + value); - coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value)); - } - - /** - * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter - * and sends control messages. - * To run the code use the following command: - * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message} - * - * @param args input arguments for running the writer. These arguments are: - * "config-factory" = The config file factory - * "config-path" = The path to config file of a job - * "type" = type of the message being written - * "key" = [optional] key of the message being written - * "value" = [optional] value of the message being written - */ - public static void main(String[] args) { - CoordinatorStreamWriterCommandLine cmdline = new CoordinatorStreamWriterCommandLine(); - OptionSet options = cmdline.parser().parse(args); - Config config = cmdline.loadConfig(options); - String type = cmdline.loadType(options); - String key = cmdline.loadKey(options); - String value = cmdline.loadValue(options); - - CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config); - writer.start(); - writer.sendMessage(type, key, value); - writer.stop(); - } - -} - http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala deleted file mode 100644 index 0c17800..0000000 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator.stream - -import org.apache.samza.util.CommandLine -import joptsimple.OptionSet - -class CoordinatorStreamWriterCommandLine extends CommandLine { - - val messageType = - parser.accepts("type", "the type of the message being sent.") - .withRequiredArg - .ofType(classOf[java.lang.String]) - .describedAs("Required field. This field is the type of the message being sent." + - " The possible values are {\"set-config\"}") - - - val messageKey = - parser.accepts("key", "the type of the message being sent") - .withRequiredArg - .ofType(classOf[java.lang.String]) - .describedAs("key of the message") - - val messageValue = - parser.accepts("value", "the type of the message being sent") - .withRequiredArg - .ofType(classOf[java.lang.String]) - .describedAs("value of the message") - - def loadType(options: OptionSet) = { - if (!options.has(messageType)) { - parser.printHelpOn(System.err) - System.exit(-1) - } - options.valueOf(messageType) - } - - def loadKey(options: OptionSet): java.lang.String = { - if (options.has(messageKey)) { - options.valueOf(messageKey) - } else { - null - } - } - - def loadValue(options: OptionSet) = { - var value: java.lang.String = null - if (options.has(messageValue)) { - value = options.valueOf(messageValue) - } - - value - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java index 84ae0b5..647cadb 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java @@ -24,18 +24,14 @@ import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemFactory; -import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.SystemProducer; -import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; import org.apache.samza.util.Util; -import java.util.ArrayList; -import java.util.List; - - /** * Helper for creating mock CoordinatorStreamConsumer and * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just @@ -46,7 +42,6 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { private static SystemConsumer mockConsumer = null; private static boolean useCachedConsumer = false; - public static void enableMockConsumerCache() { mockConsumer = null; useCachedConsumer = true; @@ -59,10 +54,9 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { /** * Returns a consumer that sends all configs to the coordinator stream. - * * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream. * The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util) - * ch:source:taskname -> changelogPartition for changelog + * ch:source:taskname -> changelogPartition for changelog * Everything else is processed as normal config */ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { @@ -86,10 +80,26 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { } /** - * Returns a MockCoordinatorSystemProducer. + * Returns a no-op producer. */ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - return new MockSystemProducer(null); + // A do-nothing producer. + return new SystemProducer() { + public void start() { + } + + public void stop() { + } + + public void register(String source) { + } + + public void send(String source, OutgoingMessageEnvelope envelope) { + } + + public void flush(String source) { + } + }; } /** @@ -105,62 +115,4 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { // Do nothing. } } - - protected static class MockSystemProducer implements SystemProducer { - private final String expectedSource; - private final List<OutgoingMessageEnvelope> envelopes; - private boolean started = false; - private boolean registered = false; - private boolean flushed = false; - - public MockSystemProducer(String expectedSource) { - this.expectedSource = expectedSource; - this.envelopes = new ArrayList<OutgoingMessageEnvelope>(); - } - - - public void start() { - started = true; - } - - public void stop() { - started = false; - } - - public void register(String source) { - registered = true; - } - - public void send(String source, OutgoingMessageEnvelope envelope) { - envelopes.add(envelope); - } - - public void flush(String source) { - flushed = true; - } - - public List<OutgoingMessageEnvelope> getEnvelopes() { - return envelopes; - } - - public boolean isStarted() { - return started; - } - - public boolean isStopped() { - return !started; - } - - public boolean isRegistered() { - return registered; - } - - public boolean isFlushed() { - return flushed; - } - - public String getExpectedSource() { - return expectedSource; - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java index 1ef07d0..68e3255 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -30,6 +31,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; import org.codehaus.jackson.type.TypeReference; @@ -40,7 +42,7 @@ public class TestCoordinatorStreamSystemProducer { public void testCoordinatorStreamSystemProducer() { String source = "source"; SystemStream systemStream = new SystemStream("system", "stream"); - MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source); + MockSystemProducer systemProducer = new MockSystemProducer(source); MockSystemAdmin systemAdmin = new MockSystemAdmin(); CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin); CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name"); @@ -91,22 +93,59 @@ public class TestCoordinatorStreamSystemProducer { } } - private static class MockCoordinatorSystemProducer extends MockCoordinatorStreamSystemFactory.MockSystemProducer { + private static class MockSystemProducer implements SystemProducer { + private final String expectedSource; + private final List<OutgoingMessageEnvelope> envelopes; + private boolean started = false; + private boolean stopped = false; + private boolean registered = false; + private boolean flushed = false; - public MockCoordinatorSystemProducer(String expectedSource) { - super(expectedSource); + public MockSystemProducer(String expectedSource) { + this.expectedSource = expectedSource; + this.envelopes = new ArrayList<OutgoingMessageEnvelope>(); + } + + public void start() { + started = true; + } + + public void stop() { + stopped = true; } - @Override public void register(String source) { - assertEquals(super.getExpectedSource(), source); - super.register(source); + assertEquals(expectedSource, source); + registered = true; + } + + public void send(String source, OutgoingMessageEnvelope envelope) { + envelopes.add(envelope); } - @Override public void flush(String source) { - assertEquals(super.getExpectedSource(), source); - super.flush(source); + assertEquals(expectedSource, source); + flushed = true; + } + + public List<OutgoingMessageEnvelope> getEnvelopes() { + return envelopes; + } + + public boolean isStarted() { + return started; + } + + public boolean isStopped() { + return stopped; + } + + public boolean isRegistered() { + return registered; + } + + public boolean isFlushed() { + return flushed; } } } http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java deleted file mode 100644 index c484660..0000000 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator.stream; - -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.serializers.model.SamzaObjectMapper; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.codehaus.jackson.type.TypeReference; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.*; - -/** - * This class is a unit test for the CoordinatorStreamWriter class. - */ -public class TestCoordinatorStreamWriter { - - private CoordinatorStreamWriter coordinatorStreamWriter; - private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer; - - @Test - public void testCoordinatorStream() { - - Map<String, String> configMap = new HashMap<>(); - configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory"); - configMap.put("job.name", "coordinator-stream-writer-test"); - Config config = new MapConfig(configMap); - coordinatorStreamWriter = new CoordinatorStreamWriter(config); - boolean exceptionHappened = false; - - try { - - //get coordinator system producer - Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer"); - coordinatorProducerField.setAccessible(true); - assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter)); - CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter); - - //get mock system producer - Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer"); - systemProducerField.setAccessible(true); - systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer); - - testStart(); - testSendMessage(); - testStop(); - - - } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - e.printStackTrace(); - exceptionHappened = true; - } - - assertFalse(exceptionHappened); - - - } - - - public void testStart() throws NoSuchFieldException, IllegalAccessException { - - //checks before starting - assertFalse(systemProducer.isStarted()); - - //start and check if start has been done successfully - coordinatorStreamWriter.start(); - assertTrue(systemProducer.isStarted()); - - } - - public void testStop() throws NoSuchFieldException, IllegalAccessException { - - //checks before stopping - assertTrue(systemProducer.isStarted()); - - //stop and check if stop has been done correctly - coordinatorStreamWriter.stop(); - assertTrue(systemProducer.isStopped()); - } - - public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - - //check a correct message - assertEquals(0, systemProducer.getEnvelopes().size()); - coordinatorStreamWriter.sendMessage("set-config", "key0", "value0"); - assertEquals(1, systemProducer.getEnvelopes().size()); - - //check invalid input is handled - boolean exceptionHappened = false; - try { - coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid"); - } catch (IllegalArgumentException e) { - exceptionHappened = true; - } - assertTrue(exceptionHappened); - assertEquals(1, systemProducer.getEnvelopes().size()); - - - //check sendSetConfigMessage method works correctly - Class[] sendArgs = {String.class, String.class}; - Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs); - sendSetConfigMethod.setAccessible(true); - sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1"); - assertEquals(2, systemProducer.getEnvelopes().size()); - - - //check the messages are correct - List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes(); - OutgoingMessageEnvelope envelope0 = envelopes.get(0); - OutgoingMessageEnvelope envelope1 = envelopes.get(1); - TypeReference<Object[]> keyRef = new TypeReference<Object[]>() { - }; - TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() { - }; - assertEquals(2, envelopes.size()); - - assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]); - Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values"); - assertEquals("value0", values.get("value")); - - assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]); - values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values"); - assertEquals("value1", values.get("value")); - } - - private <T> T deserialize(byte[] bytes, TypeReference<T> reference) { - try { - if (bytes != null) { - String valueStr = new String((byte[]) bytes, "UTF-8"); - return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference); - } - } catch (Exception e) { - throw new SamzaException(e); - } - - return null; - } - -} - http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-shell/src/main/bash/run-coordinator-stream-writer.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh deleted file mode 100644 index d2249dd..0000000 --- a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" - -exec $(dirname $0)/run-class.sh org.apache.samza.coordinator.stream.CoordinatorStreamWriter "$@"
