Repository: incubator-apex-malhar Updated Branches: refs/heads/master 6aa1357a1 -> ab800233b (forced update)
MLHR-1925 #resolve #comment report stats of offsets in committed window only MLHR-1928 #resolve #comment update offsets after emit Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/41caa952 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/41caa952 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/41caa952 Branch: refs/heads/master Commit: 41caa952e4a6b2f534fbac5230e68ea0607421a8 Parents: f1fee8f Author: Siyuan Hua <[email protected]> Authored: Sun Dec 13 21:55:38 2015 -0800 Committer: Siyuan Hua <[email protected]> Committed: Sun Dec 13 21:55:38 2015 -0800 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 53 ++++++++++++++++---- .../contrib/kafka/OffsetManager.java | 2 +- .../contrib/kafka/SimpleKafkaConsumer.java | 4 +- .../contrib/kafka/KafkaInputOperatorTest.java | 41 ++++++++++++--- .../contrib/kafka/KafkaOperatorTestBase.java | 2 +- .../contrib/kafka/OffsetManagerTest.java | 20 +++++--- 6 files changed, 93 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 952609f..ec50615 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -38,10 +38,8 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.base.Joiner; import com.google.common.base.Predicate; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; @@ -74,6 +72,8 @@ import kafka.message.MessageAndOffset; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,7 +145,14 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem protected transient long currentWindowId; protected transient int operatorId; protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState; - protected transient Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>(); + /** + * Offsets that are checkpointed for recovery + */ + protected Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>(); + /** + * offset history with window id + */ + protected transient List<Pair<Long, Map<KafkaPartition, Long>>> offsetTrackHistory = new LinkedList<>(); private transient OperatorContext context = null; // By default the partition policy is 1:1 public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE; @@ -212,6 +219,10 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem { logger.debug("consumer {} topic {} cacheSize {}", consumer, consumer.getTopic(), consumer.getCacheSize()); consumer.create(); + // reset the offsets to checkpointed one + if (consumer instanceof SimpleKafkaConsumer && !offsetStats.isEmpty()) { + ((SimpleKafkaConsumer)consumer).resetOffset(offsetStats); + } this.context = context; operatorId = context.getId(); if(consumer instanceof HighlevelKafkaConsumer && !(idempotentStorageManager instanceof IdempotentStorageManager.NoopIdempotentStorageManager)) { @@ -301,12 +312,13 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem @Override public void endWindow() { + //TODO depends on APEX-78 only needs to keep the history of windows needs to be commit + if (getConsumer() instanceof SimpleKafkaConsumer) { + Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats); + offsetTrackHistory.add(Pair.of(currentWindowId, carryOn)); + } if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { try { - if((getConsumer() instanceof SimpleKafkaConsumer)) { - SimpleKafkaConsumer cons = (SimpleKafkaConsumer) getConsumer(); - context.setCounters(cons.getConsumerStats(offsetStats)); - } idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { @@ -326,6 +338,23 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem @Override public void committed(long windowId) { + if ((getConsumer() instanceof SimpleKafkaConsumer)) { + SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer(); + for (Iterator<Pair<Long, Map<KafkaPartition, Long>>> iter = offsetTrackHistory.iterator(); iter.hasNext(); ) { + Pair<Long, Map<KafkaPartition, Long>> item = iter.next(); + if (item.getLeft() < windowId) { + iter.remove(); + continue; + } else if (item.getLeft() == windowId) { + if (logger.isDebugEnabled()) { + logger.debug("report offsets {} ", Joiner.on(';').withKeyValueSeparator("=").join(item.getRight())); + } + context.setCounters(cons.getConsumerStats(item.getRight())); + } + break; + } + } + try { idempotentStorageManager.deleteUpTo(operatorId, windowId); } @@ -365,9 +394,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } for (int i = 0; i < count; i++) { KafkaConsumer.KafkaMessage message = consumer.pollMessage(); - // Ignore the duplicate messages - if(offsetStats.containsKey(message.kafkaPart) && message.offSet <= offsetStats.get(message.kafkaPart)) - continue; emitTuple(message.msg); offsetStats.put(message.kafkaPart, message.offSet); MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart); @@ -586,7 +612,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem Input lInput = new Input(bos.toByteArray()); @SuppressWarnings("unchecked") Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass())); - p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); + if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) { + p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); + if (initOffsets != null) { + p.getPartitionedInstance().offsetStats.putAll(initOffsets); + } + } newManagers.add(p.getPartitionedInstance().idempotentStorageManager); PartitionInfo pif = new PartitionInfo(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java index 660b3d7..5eb0575 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java @@ -30,7 +30,7 @@ import java.util.Map; */ public interface OffsetManager { - +// /** * http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java index 58ef95f..e10502b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java @@ -305,13 +305,13 @@ public class SimpleKafkaConsumer extends KafkaConsumer // This map maintains mapping between kafka partition and it's leader broker in realtime monitored by a thread private transient final ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>(); - + /** * Track offset for each partition, so operator could start from the last serialized state Use ConcurrentHashMap to * avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or * synchronizedmap) */ - private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>(); + private final transient ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>(); private transient AtomicReference<Throwable> monitorException; private transient AtomicInteger monitorExceptionCount; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index eeb9d20..76b3550 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -19,6 +19,7 @@ package com.datatorrent.contrib.kafka; import com.datatorrent.api.Attribute; +import com.datatorrent.api.StringCodec; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; @@ -27,6 +28,7 @@ import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.LocalMode; import com.datatorrent.api.Operator; import com.datatorrent.api.Partitioner; +import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.partitioner.StatelessPartitionerTest; @@ -41,6 +43,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -54,6 +58,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); static AtomicInteger tupleCount = new AtomicInteger(); static CountDownLatch latch; + static boolean isSuicide = false; + static int suicideTrigger = 3000; /** * Test Operator to collect tuples from KafkaSingleInputStringOperator. @@ -68,6 +74,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase public static class CollectorInputPort<T> extends DefaultInputPort<T> { + private int k = 0; + public CollectorInputPort(String id, Operator module) { super(); @@ -76,6 +84,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase @Override public void process(T tuple) { + if (isSuicide && k++ == suicideTrigger) { + //you can only kill yourself once + isSuicide = false; + throw new RuntimeException(); + } if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) { if (latch != null) { latch.countDown(); @@ -84,12 +97,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } tupleCount.incrementAndGet(); } - - @Override - public void setConnected(boolean flag) - { - tupleCount.set(0); - } } /** @@ -124,6 +131,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase // Create KafkaSinglePortStringInputOperator KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class); + if(isSuicide) { + // make some extreme assumptions to make it fail if checkpointing wrong offsets + dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent("target/ck", new Configuration())); + node.setMaxTuplesPerWindow(500); + } + if(idempotent) { node.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); } @@ -131,6 +145,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase node.setConsumer(consumer); + consumer.setCacheSize(5000); + if (isValid) { node.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]); } @@ -151,7 +167,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase Assert.assertTrue("TIMEOUT: 30s ", latch.await(300000, TimeUnit.MILLISECONDS)); // Check results - Assert.assertEquals("Tuple count", totalCount, tupleCount.intValue()); + Assert.assertTrue("Expected count >= " + totalCount + "; Actual count " + tupleCount.intValue(), + totalCount <= tupleCount.intValue()); logger.debug(String.format("Number of emitted tuples: %d", tupleCount.intValue())); p.close(); @@ -182,6 +199,16 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } @Test + public void testKafkaInputOperator_SimpleSuicide() throws Exception + { + int totalCount = 10000; + KafkaConsumer k = new SimpleKafkaConsumer(); + k.setInitialOffset("earliest"); + isSuicide = true; + testKafkaInputOperator(1000, totalCount, k, true, false); + } + + @Test public void testKafkaInputOperator_Simple_Idempotent() throws Exception { int totalCount = 10000; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java index c21b2e4..f4f5ef2 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java @@ -58,7 +58,7 @@ public class KafkaOperatorTestBase // multiple cluster private final ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2]; - public String baseDir = "/tmp"; + public String baseDir = "target"; private final String zkBaseDir = "zookeeper-server-data"; private final String kafkaBaseDir = "kafka-server-data"; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java index 7b36ea8..04fe282 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.junit.Test; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; @@ -64,6 +65,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase static CountDownLatch latch; static final String OFFSET_FILE = ".offset"; static long initialPos = 10l; + static Path baseFolder = new Path("target"); public static class TestOffsetManager implements OffsetManager{ @@ -98,8 +100,8 @@ public class OffsetManagerTest extends KafkaOperatorTestBase offsets.putAll(offsetsOfPartitions); try { - Path tmpFile = new Path(filename + ".tmp"); - Path dataFile = new Path(filename); + Path tmpFile = new Path(baseFolder, filename + ".tmp"); + Path dataFile = new Path(baseFolder, filename); FSDataOutputStream out = fs.create(tmpFile, true); for (Entry<KafkaPartition, Long> e : offsets.entrySet()) { out.writeBytes(e.getKey() +", " + e.getValue() + "\n"); @@ -142,7 +144,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase /** * Test Operator to collect tuples from KafkaSingleInputStringOperator. * - * @param <T> + * @param */ public static class CollectorModule extends BaseOperator { @@ -226,7 +228,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase private void cleanFile() { try { - FileSystem.get(new Configuration()).delete(new Path(TEST_TOPIC + OFFSET_FILE), true); + FileSystem.get(new Configuration()).delete(new Path(baseFolder, TEST_TOPIC + OFFSET_FILE), true); } catch (IOException e) { } @@ -278,18 +280,22 @@ public class OffsetManagerTest extends KafkaOperatorTestBase // Connect ports dag.addStream("Kafka message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); + dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); + // Create local cluster final LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(true); lc.runAsync(); - // Wait 30s for consumer finish consuming all the messages and offsets has been updated to 100 - assertTrue("TIMEOUT: 30s, collected " + collectedTuples + " tuples", latch.await(30000, TimeUnit.MILLISECONDS)); + boolean isNotTimeout = latch.await(30000, TimeUnit.MILLISECONDS); + // Wait 30s for consumer finish consuming all the messages and offsets has been updated to 100 + assertTrue("TIMEOUT: 30s, collected " + collectedTuples.size() + " tuples", isNotTimeout); + // Check results - assertEquals("Tuple count", expectedCount, collectedTuples.size()); + assertEquals("Tuple count " + collectedTuples, expectedCount, collectedTuples.size()); logger.debug(String.format("Number of emitted tuples: %d", collectedTuples.size())); p.close();
