Repository: samza Updated Branches: refs/heads/master 01ee053ed -> b480f2535
SAMZA-704 : Create a tool to write coordinator stream Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b480f253 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b480f253 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b480f253 Branch: refs/heads/master Commit: b480f2535745a02f2153784c4ba325859e90afaa Parents: 01ee053 Author: Shadi A. Noghabi <[email protected]> Authored: Thu Jul 16 15:39:45 2015 -0700 Committer: Navina <[email protected]> Committed: Thu Jul 16 15:39:45 2015 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../stream/CoordinatorStreamMessage.java | 8 +- .../stream/CoordinatorStreamWriter.java | 128 ++++++++++++++ .../CoordinatorStreamWriterCommandLine.scala | 71 ++++++++ .../MockCoordinatorStreamSystemFactory.java | 90 +++++++--- .../TestCoordinatorStreamSystemProducer.java | 59 ++----- .../stream/TestCoordinatorStreamWriter.java | 166 +++++++++++++++++++ .../main/bash/run-coordinator-stream-writer.sh | 21 +++ 8 files changed, 470 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index eef3370..6654319 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -123,6 +123,7 @@ <allow class="org.apache.samza.Partition" /> <allow class="org.apache.samza.SamzaException" /> + <allow class="joptsimple.OptionSet" /> </subpackage> <subpackage name="checkpoint"> http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java index 6bd1bd3..e5ab4fb 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java @@ -211,7 +211,7 @@ public class CoordinatorStreamMessage { /** * The type of the message is used to convert a generic - * CoordinatorStreaMessage into a specific message, such as a SetConfig + * CoordinatorStreamMessage into a specific message, such as a SetConfig * message. * * @return The type of the message. @@ -235,14 +235,14 @@ public class CoordinatorStreamMessage { } /** - * @return Whether the message signifies a delete or not. + * @return The username of a message. */ public String getUsername() { return (String) this.messageMap.get("username"); } /** - * @return Whether the message signifies a delete or not. + * @return The timestamp of a message. */ public long getTimestamp() { return (Long) this.messageMap.get("timestamp"); @@ -254,7 +254,7 @@ public class CoordinatorStreamMessage { public Map<String, Object> getMessageMap() { if (!isDelete) { Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap); - // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map. + // To make sure the values is immutable, we overwrite it with an immutable version of the the values map. immutableMap.put("values", Collections.unmodifiableMap(getMessageValues())); return Collections.unmodifiableMap(immutableMap); } else { http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java new file mode 100644 index 0000000..f769756 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.stream; + +import joptsimple.OptionSet; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class writes control messages to the CoordinatorStream. + * To use this class it, first, it should be initialized by the start() method, + * and then use the sendMessage() function to send all the control messages needed. + * Finally, the stop() method should be called. + * The control messages are in the format of a (type, key, value) where: + * type: defines the kind of message of the control message from the set {set-config}. + * key: defines a key to associate with the value. This can be null as well for messages with no value + * value: defines the value being sent along with the message. This can be null as well for messages with no value. + */ +public class CoordinatorStreamWriter { + + private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class); + public final static String SOURCE = "coordinator-stream-writer"; + public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE; + + private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer; + + + public CoordinatorStreamWriter(Config config) { + coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap()); + } + + /** + * This method initializes the writer by starting a coordinator stream producer. + */ + public void start() { + coordinatorStreamSystemProducer.register(CoordinatorStreamWriter.SOURCE); + coordinatorStreamSystemProducer.start(); + log.info("Started coordinator stream writer."); + } + + /** + * This method stops the writer and closes the coordinator stream producer + */ + public void stop() { + log.info("Stopping the coordinator stream producer."); + coordinatorStreamSystemProducer.stop(); + } + + /** + * This method sends a message to the coordinator stream. This creates a message containing (type,key,value). + * For example if you want to set the number of yarn containers to 3, you would use + * ("set-config", "yarn.container.count", "3"). + * + * @param type defines the kind of message of the control message from the set {"set-config"}. + * @param key defines a key to associate with the value. This can be null for messages with no key or value. + * @param value defines the value being sent along with the message. This can be null for messages with no value. + */ + public void sendMessage(String type, String key, String value) { + //TODO: validate keys and values + if (type.equals(SET_CONFIG_TYPE)) { + sendSetConfigMessage(key, value); + } else { + throw new IllegalArgumentException("Type is invalid. The possible values are {" + SET_CONFIG_TYPE + "}"); + } + } + + /** + * This method sends message of type "set-config" to the coordinator stream + * + * @param key defines the name of the configuration being set. For example, for setting the number of yarn containers, + * the key is "yarn.container.count" + * @param value defines the value associated with the key. For example, if the key is "yarn.container.count" the value + * is the new number of containers. + */ + private void sendSetConfigMessage(String key, String value) { + log.info("sent SetConfig message with key = " + key + " and value = " + value); + coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value)); + } + + /** + * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter + * and sends control messages. + * To run the code use the following command: + * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message} + * + * @param args input arguments for running the writer. These arguments are: + * "config-factory" = The config file factory + * "config-path" = The path to config file of a job + * "type" = type of the message being written + * "key" = [optional] key of the message being written + * "value" = [optional] value of the message being written + */ + public static void main(String[] args) { + CoordinatorStreamWriterCommandLine cmdline = new CoordinatorStreamWriterCommandLine(); + OptionSet options = cmdline.parser().parse(args); + Config config = cmdline.loadConfig(options); + String type = cmdline.loadType(options); + String key = cmdline.loadKey(options); + String value = cmdline.loadValue(options); + + CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config); + writer.start(); + writer.sendMessage(type, key, value); + writer.stop(); + } + +} + http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala new file mode 100644 index 0000000..0c17800 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.stream + +import org.apache.samza.util.CommandLine +import joptsimple.OptionSet + +class CoordinatorStreamWriterCommandLine extends CommandLine { + + val messageType = + parser.accepts("type", "the type of the message being sent.") + .withRequiredArg + .ofType(classOf[java.lang.String]) + .describedAs("Required field. This field is the type of the message being sent." + + " The possible values are {\"set-config\"}") + + + val messageKey = + parser.accepts("key", "the type of the message being sent") + .withRequiredArg + .ofType(classOf[java.lang.String]) + .describedAs("key of the message") + + val messageValue = + parser.accepts("value", "the type of the message being sent") + .withRequiredArg + .ofType(classOf[java.lang.String]) + .describedAs("value of the message") + + def loadType(options: OptionSet) = { + if (!options.has(messageType)) { + parser.printHelpOn(System.err) + System.exit(-1) + } + options.valueOf(messageType) + } + + def loadKey(options: OptionSet): java.lang.String = { + if (options.has(messageKey)) { + options.valueOf(messageKey) + } else { + null + } + } + + def loadValue(options: OptionSet) = { + var value: java.lang.String = null + if (options.has(messageValue)) { + value = options.valueOf(messageValue) + } + + value + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java index 647cadb..84ae0b5 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java @@ -24,14 +24,18 @@ import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemFactory; -import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; import org.apache.samza.util.Util; +import java.util.ArrayList; +import java.util.List; + + /** * Helper for creating mock CoordinatorStreamConsumer and * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just @@ -42,6 +46,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { private static SystemConsumer mockConsumer = null; private static boolean useCachedConsumer = false; + public static void enableMockConsumerCache() { mockConsumer = null; useCachedConsumer = true; @@ -54,9 +59,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { /** * Returns a consumer that sends all configs to the coordinator stream. + * * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream. * The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util) - * ch:source:taskname -> changelogPartition for changelog + * ch:source:taskname -> changelogPartition for changelog * Everything else is processed as normal config */ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { @@ -80,26 +86,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { } /** - * Returns a no-op producer. + * Returns a MockCoordinatorSystemProducer. */ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - // A do-nothing producer. - return new SystemProducer() { - public void start() { - } - - public void stop() { - } - - public void register(String source) { - } - - public void send(String source, OutgoingMessageEnvelope envelope) { - } - - public void flush(String source) { - } - }; + return new MockSystemProducer(null); } /** @@ -115,4 +105,62 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { // Do nothing. } } + + protected static class MockSystemProducer implements SystemProducer { + private final String expectedSource; + private final List<OutgoingMessageEnvelope> envelopes; + private boolean started = false; + private boolean registered = false; + private boolean flushed = false; + + public MockSystemProducer(String expectedSource) { + this.expectedSource = expectedSource; + this.envelopes = new ArrayList<OutgoingMessageEnvelope>(); + } + + + public void start() { + started = true; + } + + public void stop() { + started = false; + } + + public void register(String source) { + registered = true; + } + + public void send(String source, OutgoingMessageEnvelope envelope) { + envelopes.add(envelope); + } + + public void flush(String source) { + flushed = true; + } + + public List<OutgoingMessageEnvelope> getEnvelopes() { + return envelopes; + } + + public boolean isStarted() { + return started; + } + + public boolean isStopped() { + return !started; + } + + public boolean isRegistered() { + return registered; + } + + public boolean isFlushed() { + return flushed; + } + + public String getExpectedSource() { + return expectedSource; + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java index 68e3255..1ef07d0 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.samza.SamzaException; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; import org.codehaus.jackson.type.TypeReference; @@ -42,7 +40,7 @@ public class TestCoordinatorStreamSystemProducer { public void testCoordinatorStreamSystemProducer() { String source = "source"; SystemStream systemStream = new SystemStream("system", "stream"); - MockSystemProducer systemProducer = new MockSystemProducer(source); + MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source); MockSystemAdmin systemAdmin = new MockSystemAdmin(); CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin); CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name"); @@ -93,59 +91,22 @@ public class TestCoordinatorStreamSystemProducer { } } - private static class MockSystemProducer implements SystemProducer { - private final String expectedSource; - private final List<OutgoingMessageEnvelope> envelopes; - private boolean started = false; - private boolean stopped = false; - private boolean registered = false; - private boolean flushed = false; + private static class MockCoordinatorSystemProducer extends MockCoordinatorStreamSystemFactory.MockSystemProducer { - public MockSystemProducer(String expectedSource) { - this.expectedSource = expectedSource; - this.envelopes = new ArrayList<OutgoingMessageEnvelope>(); - } - - public void start() { - started = true; - } - - public void stop() { - stopped = true; + public MockCoordinatorSystemProducer(String expectedSource) { + super(expectedSource); } + @Override public void register(String source) { - assertEquals(expectedSource, source); - registered = true; - } - - public void send(String source, OutgoingMessageEnvelope envelope) { - envelopes.add(envelope); + assertEquals(super.getExpectedSource(), source); + super.register(source); } + @Override public void flush(String source) { - assertEquals(expectedSource, source); - flushed = true; - } - - public List<OutgoingMessageEnvelope> getEnvelopes() { - return envelopes; - } - - public boolean isStarted() { - return started; - } - - public boolean isStopped() { - return stopped; - } - - public boolean isRegistered() { - return registered; - } - - public boolean isFlushed() { - return flushed; + assertEquals(super.getExpectedSource(), source); + super.flush(source); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java new file mode 100644 index 0000000..c484660 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.stream; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.codehaus.jackson.type.TypeReference; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * This class is a unit test for the CoordinatorStreamWriter class. + */ +public class TestCoordinatorStreamWriter { + + private CoordinatorStreamWriter coordinatorStreamWriter; + private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer; + + @Test + public void testCoordinatorStream() { + + Map<String, String> configMap = new HashMap<>(); + configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory"); + configMap.put("job.name", "coordinator-stream-writer-test"); + Config config = new MapConfig(configMap); + coordinatorStreamWriter = new CoordinatorStreamWriter(config); + boolean exceptionHappened = false; + + try { + + //get coordinator system producer + Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer"); + coordinatorProducerField.setAccessible(true); + assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter)); + CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter); + + //get mock system producer + Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer"); + systemProducerField.setAccessible(true); + systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer); + + testStart(); + testSendMessage(); + testStop(); + + + } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + e.printStackTrace(); + exceptionHappened = true; + } + + assertFalse(exceptionHappened); + + + } + + + public void testStart() throws NoSuchFieldException, IllegalAccessException { + + //checks before starting + assertFalse(systemProducer.isStarted()); + + //start and check if start has been done successfully + coordinatorStreamWriter.start(); + assertTrue(systemProducer.isStarted()); + + } + + public void testStop() throws NoSuchFieldException, IllegalAccessException { + + //checks before stopping + assertTrue(systemProducer.isStarted()); + + //stop and check if stop has been done correctly + coordinatorStreamWriter.stop(); + assertTrue(systemProducer.isStopped()); + } + + public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + + //check a correct message + assertEquals(0, systemProducer.getEnvelopes().size()); + coordinatorStreamWriter.sendMessage("set-config", "key0", "value0"); + assertEquals(1, systemProducer.getEnvelopes().size()); + + //check invalid input is handled + boolean exceptionHappened = false; + try { + coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid"); + } catch (IllegalArgumentException e) { + exceptionHappened = true; + } + assertTrue(exceptionHappened); + assertEquals(1, systemProducer.getEnvelopes().size()); + + + //check sendSetConfigMessage method works correctly + Class[] sendArgs = {String.class, String.class}; + Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs); + sendSetConfigMethod.setAccessible(true); + sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1"); + assertEquals(2, systemProducer.getEnvelopes().size()); + + + //check the messages are correct + List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes(); + OutgoingMessageEnvelope envelope0 = envelopes.get(0); + OutgoingMessageEnvelope envelope1 = envelopes.get(1); + TypeReference<Object[]> keyRef = new TypeReference<Object[]>() { + }; + TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() { + }; + assertEquals(2, envelopes.size()); + + assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]); + Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values"); + assertEquals("value0", values.get("value")); + + assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]); + values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values"); + assertEquals("value1", values.get("value")); + } + + private <T> T deserialize(byte[] bytes, TypeReference<T> reference) { + try { + if (bytes != null) { + String valueStr = new String((byte[]) bytes, "UTF-8"); + return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference); + } + } catch (Exception e) { + throw new SamzaException(e); + } + + return null; + } + +} + http://git-wip-us.apache.org/repos/asf/samza/blob/b480f253/samza-shell/src/main/bash/run-coordinator-stream-writer.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh new file mode 100644 index 0000000..d2249dd --- /dev/null +++ b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh org.apache.samza.coordinator.stream.CoordinatorStreamWriter "$@"
