Repository: incubator-gobblin Updated Branches: refs/heads/master 4e9453fba -> d769b2144
[GOBBLIN-225] Fix cloning of ControlMessages in PartitionDataWriterMessageHandler Closes #2079 from htran1/partition_handler Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d769b214 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d769b214 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d769b214 Branch: refs/heads/master Commit: d769b2144c7239a4c084c7ba65daadf37903b2ae Parents: 4e9453f Author: Hung Tran <[email protected]> Authored: Mon Aug 28 11:46:45 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Aug 28 11:46:45 2017 -0700 ---------------------------------------------------------------------- .../gobblin/writer/PartitionedDataWriter.java | 8 ++++- .../gobblin/writer/PartitionedWriterTest.java | 37 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d769b214/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java index d915730..a667b86 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java @@ -45,6 +45,7 @@ import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.source.extractor.CheckpointableWatermark; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.stream.RecordEnvelope; +import org.apache.gobblin.stream.StreamEntity; import org.apache.gobblin.util.AvroUtils; import org.apache.gobblin.util.FinalState; import org.apache.gobblin.writer.partitioner.WriterPartitioner; @@ -330,9 +331,14 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin private class PartitionDataWriterMessageHandler implements ControlMessageHandler { @Override public void handleMessage(ControlMessage message) { + StreamEntity.ForkCloner cloner = message.forkCloner(); + for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) { - writer.getMessageHandler().handleMessage((ControlMessage) message.getSingleClone()); + ControlMessage cloned = (ControlMessage) cloner.getClone(); + writer.getMessageHandler().handleMessage(cloned); } + + cloner.close(); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d769b214/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java index 3d5923d..c00c823 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java @@ -23,6 +23,8 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.ack.BasicAckableForTesting; +import org.apache.gobblin.stream.FlushControlMessage; import org.testng.Assert; import org.testng.annotations.Test; @@ -164,4 +166,39 @@ public class PartitionedWriterTest { } } + @Test + public void testControlMessageHandler() throws IOException { + + State state = new State(); + state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, TestPartitioner.class.getCanonicalName()); + + TestPartitionAwareWriterBuilder builder = new TestPartitionAwareWriterBuilder(); + + PartitionedDataWriter writer = new PartitionedDataWriter<String, String>(builder, state); + + Assert.assertEquals(builder.actions.size(), 0); + + String record1 = "abc"; + writer.writeEnvelope(new RecordEnvelope(record1)); + + String record2 = "123"; + writer.writeEnvelope(new RecordEnvelope(record2)); + + FlushControlMessage controlMessage = new FlushControlMessage<>(new FlushControlMessage.FlushReason("test")); + BasicAckableForTesting ackable = new BasicAckableForTesting(); + + controlMessage.addCallBack(ackable); + Assert.assertEquals(ackable.acked, 0); + + // when the control message is cloned properly then this does not raise an error + writer.getMessageHandler().handleMessage(controlMessage); + + // message handler does not ack since consumeRecordStream does acking for control messages + // this should be revisited when control message error handling is changed + controlMessage.ack(); + + Assert.assertEquals(ackable.acked, 1); + + writer.close(); + } }
