Repository: samza Updated Branches: refs/heads/master 023a7ce23 -> 6ae7784a5
SAMZA-950: Add backward-compatible KafkaSystemProducer constructor for Java clients Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6ae7784a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6ae7784a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6ae7784a Branch: refs/heads/master Commit: 6ae7784a579d6aa310eb0eefdc75c40e4a13d4d7 Parents: 023a7ce Author: Jacob Maes <[email protected]> Authored: Thu May 19 13:47:22 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu May 19 13:47:22 2016 -0700 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 7 +++ .../kafka/TestKafkaSystemProducerJava.java | 58 ++++++++++++++++++++ 2 files changed, 65 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6ae7784a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index d1a7a9f..3769e10 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -49,6 +49,13 @@ class KafkaSystemProducer(systemName: String, var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]() val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file."; + // Backward-compatible constructor for Java clients + def this(systemName: String, + retryBackoff: ExponentialSleepStrategy, + getProducer: () => Producer[Array[Byte], Array[Byte]], + metrics: KafkaSystemProducerMetrics, + clock: () => Long) = this(systemName, retryBackoff, getProducer, metrics, clock, 30) + def start() { } http://git-wip-us.apache.org/repos/asf/samza/blob/6ae7784a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java new file mode 100644 index 0000000..04c9113 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.util.ExponentialSleepStrategy; +import org.junit.Test; +import scala.runtime.AbstractFunction0; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + + +/** + * Tests the instantiation of a KafkaSystemProducer from Java clients + */ +public class TestKafkaSystemProducerJava { + @Test + public void testInstantiateProducer() { + KafkaSystemProducer ksp = new KafkaSystemProducer("SysName", new ExponentialSleepStrategy(2.0, 200, 10000), + new AbstractFunction0<Producer<byte[], byte[]>>() { + @Override + public Producer<byte[], byte[]> apply() { + return new KafkaProducer<>(new HashMap<String, Object>()); + } + }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() { + @Override + public Object apply() { + return System.currentTimeMillis(); + } + }); + + // Default value should have been used. + assertEquals(30, ksp.maxRetries()); + long now = System.currentTimeMillis(); + assertTrue((Long)ksp.clock().apply() >= now); + } +} \ No newline at end of file
