Repository: incubator-gobblin Updated Branches: refs/heads/master 6160adca2 -> 99b715238
[GOBBLIN-234] Add a ControlMessageInjector that generates metadata up⦠Closes #2107 from htran1/metadata_update Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/99b71523 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/99b71523 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/99b71523 Branch: refs/heads/master Commit: 99b71523887c7d7f780ac78c20941093bd3751f5 Parents: 6160adc Author: Hung Tran <[email protected]> Authored: Wed Sep 20 14:25:02 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Sep 20 14:25:02 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 4 +- .../org/apache/gobblin/converter/Converter.java | 4 +- .../gobblin/stream/FlushControlMessage.java | 18 +++--- .../gobblin/converter/AsyncConverter1to1.java | 2 +- .../writer/CloseOnFlushWriterWrapper.java | 65 +++++++++++++++++--- .../gobblin/writer/PartitionedDataWriter.java | 13 +++- .../writer/CloseOnFlushWriterWrapperTest.java | 36 +++++------ .../gobblin/writer/PartitionedWriterTest.java | 2 +- .../gobblin/runtime/TestRecordStream.java | 62 ++++++++----------- 9 files changed, 125 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index e54a54b..303ad15 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -342,6 +342,7 @@ public class ConfigurationKeys { public static final String SIMPLE_WRITER_DELIMITER = "simple.writer.delimiter"; public static final String SIMPLE_WRITER_PREPEND_SIZE = "simple.writer.prepend.size"; + // Internal use only - used to send metadata to publisher public static final String WRITER_METADATA_KEY = WRITER_PREFIX + "._internal.metadata"; public static final String WRITER_PARTITION_PATH_KEY = WRITER_PREFIX + "._internal.partition.path"; @@ -354,9 +355,6 @@ public class ConfigurationKeys { public static final String WRITER_BYTES_WRITTEN = WRITER_PREFIX + ".bytes.written"; public static final String WRITER_EARLIEST_TIMESTAMP = WRITER_PREFIX + ".earliest.timestamp"; public static final String WRITER_AVERAGE_TIMESTAMP = WRITER_PREFIX + ".average.timestamp"; - // Used internally to enable closing of the writer on flush - public static final String WRITER_CLOSE_ON_FLUSH_KEY = WRITER_PREFIX + ".closeOnFlush"; - public static final boolean DEFAULT_WRITER_CLOSE_ON_FLUSH = false; /** * Configuration properties used by the quality checker. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java index b4a45b7..514f5be 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java @@ -128,7 +128,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState WorkUnitState workUnitState) throws SchemaConversionException { init(workUnitState); this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(), - Optional.of(convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState))).build(); + Optional.fromNullable(convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState))).build(); Flowable<StreamEntity<DO>> outputStream = inputStream.getRecordStream() .flatMap(in -> { @@ -141,7 +141,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState if (in instanceof MetadataUpdateControlMessage) { this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput( ((MetadataUpdateControlMessage) in).getGlobalMetadata(), - Optional.of(convertSchema((SI)((MetadataUpdateControlMessage) in).getGlobalMetadata() + Optional.fromNullable(convertSchema((SI)((MetadataUpdateControlMessage) in).getGlobalMetadata() .getSchema(), workUnitState))).build(); out = new MetadataUpdateControlMessage<SO, DO>(this.outputGlobalMetadata); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java index 8760754..7c23dd3 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java @@ -16,7 +16,9 @@ */ package org.apache.gobblin.stream; +import lombok.AccessLevel; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -25,21 +27,23 @@ import lombok.Getter; * Control message for flushing writers * @param <D> */ -@AllArgsConstructor +@AllArgsConstructor(access= AccessLevel.PRIVATE) @EqualsAndHashCode +@Builder public class FlushControlMessage<D> extends ControlMessage<D> { @Getter - private final FlushReason flushReason; + private final String flushReason; + @Getter + private final FlushType flushType; @Override protected StreamEntity<D> buildClone() { - return new FlushControlMessage(flushReason); + return FlushControlMessage.<D>builder().flushReason(this.flushReason).flushType(this.flushType).build(); } @AllArgsConstructor - @EqualsAndHashCode - public static class FlushReason { - @Getter - private final String reason; + public enum FlushType { + FlUSH, + FLUSH_AND_CLOSE /* use this type to request a close after flush */ } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java index b5092ef..6d56e8a 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java @@ -95,7 +95,7 @@ public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI, S } }, false, maxConcurrentAsyncConversions); return inputStream.withRecordStream(outputStream, GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(), - Optional.of(outputSchema)).build()); + Optional.fromNullable(outputSchema)).build()); } @RequiredArgsConstructor http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java index 5b30cba..66e5a26 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java @@ -29,6 +29,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.records.FlushControlMessageHandler; +import org.apache.gobblin.stream.ControlMessage; +import org.apache.gobblin.stream.FlushControlMessage; +import org.apache.gobblin.stream.MetadataUpdateControlMessage; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.util.Decorator; import org.apache.gobblin.util.FinalState; @@ -40,6 +43,13 @@ import org.apache.gobblin.util.FinalState; * @param <D> */ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements Decorator, FinalState, Retriable { + // Used internally to enable closing of the writer on flush + public static final String WRITER_CLOSE_ON_FLUSH_KEY = ConfigurationKeys.WRITER_PREFIX + ".closeOnFlush"; + public static final boolean DEFAULT_WRITER_CLOSE_ON_FLUSH = false; + + public static final String WRITER_CLOSE_ON_METADATA_UPDATE = ConfigurationKeys.WRITER_PREFIX + ".closeOnMetadataUpdate"; + public static final boolean DEFAULT_CLOSE_ON_METADATA_UPDATE = true; + private static final Logger LOG = LoggerFactory.getLogger(CloseOnFlushWriterWrapper.class); private final State state; @@ -48,6 +58,8 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De private boolean closed; // is the close functionality enabled? private final boolean closeOnFlush; + private final ControlMessageHandler controlMessageHandler; + private final boolean closeOnMetadataUpdate; public CloseOnFlushWriterWrapper(Supplier<DataWriter<D>> writerSupplier, State state) { Preconditions.checkNotNull(state, "State is required."); @@ -58,8 +70,12 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De this.writer = writerSupplier.get(); this.closed = false; - this.closeOnFlush = this.state.getPropAsBoolean(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, - ConfigurationKeys.DEFAULT_WRITER_CLOSE_ON_FLUSH); + this.closeOnFlush = this.state.getPropAsBoolean(WRITER_CLOSE_ON_FLUSH_KEY, + DEFAULT_WRITER_CLOSE_ON_FLUSH); + + this.controlMessageHandler = new CloseOnFlushWriterMessageHandler(); + this.closeOnMetadataUpdate = this.state.getPropAsBoolean(WRITER_CLOSE_ON_METADATA_UPDATE, + DEFAULT_CLOSE_ON_METADATA_UPDATE); } @Override @@ -129,13 +145,7 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De @Override public ControlMessageHandler getMessageHandler() { - // if close on flush is configured then create a handler that will invoke the wrapper's flush to perform close - // on flush operations, otherwise return the wrapped writer's handler. - if (this.closeOnFlush) { - return new FlushControlMessageHandler(this); - } else { - return this.writer.getMessageHandler(); - } + return this.controlMessageHandler; } /** @@ -144,12 +154,47 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De */ @Override public void flush() throws IOException { + flush(this.closeOnFlush); + } + + private void flush(boolean close) throws IOException { this.writer.flush(); // commit data then close the writer - if (this.closeOnFlush) { + if (close) { commit(); close(); } } + + /** + * A {@link ControlMessageHandler} that handles closing on flush + */ + private class CloseOnFlushWriterMessageHandler implements ControlMessageHandler { + @Override + public void handleMessage(ControlMessage message) { + ControlMessageHandler underlyingHandler = CloseOnFlushWriterWrapper.this.writer.getMessageHandler(); + + // let underlying writer handle the control messages first + underlyingHandler.handleMessage(message); + + // Handle close after flush logic. The file is closed if requested by the flush or the configuration. + if ((message instanceof FlushControlMessage && + (CloseOnFlushWriterWrapper.this.closeOnFlush || + ((FlushControlMessage) message).getFlushType() == FlushControlMessage.FlushType.FLUSH_AND_CLOSE)) || + (message instanceof MetadataUpdateControlMessage && CloseOnFlushWriterWrapper.this.closeOnMetadataUpdate)) { + try { + // avoid flushing again + if (underlyingHandler instanceof FlushControlMessageHandler) { + commit(); + close(); + } else { + flush(true); + } + } catch (IOException e) { + throw new RuntimeException("Could not flush when handling FlushControlMessage", e); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java index a667b86..83dd074 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java @@ -44,6 +44,7 @@ import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterD import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.source.extractor.CheckpointableWatermark; import org.apache.gobblin.stream.ControlMessage; +import org.apache.gobblin.stream.MetadataUpdateControlMessage; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.stream.StreamEntity; import org.apache.gobblin.util.AvroUtils; @@ -68,8 +69,10 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin private final Optional<WriterPartitioner> partitioner; private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters; private final Optional<PartitionAwareDataWriterBuilder> builder; + private final DataWriterBuilder writerBuilder; private final boolean shouldPartition; private final Closer closer; + private final ControlMessageHandler controlMessageHandler; private boolean isSpeculativeAttemptSafe; private boolean isWatermarkCapable; @@ -79,6 +82,8 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin this.isWatermarkCapable = true; this.baseWriterId = builder.getWriterId(); this.closer = Closer.create(); + this.writerBuilder = builder; + this.controlMessageHandler = new PartitionDataWriterMessageHandler(); this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>() { @Override public DataWriter<D> load(final GenericRecord key) @@ -322,7 +327,7 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin @Override public ControlMessageHandler getMessageHandler() { - return new PartitionDataWriterMessageHandler(); + return this.controlMessageHandler; } /** @@ -333,6 +338,12 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin public void handleMessage(ControlMessage message) { StreamEntity.ForkCloner cloner = message.forkCloner(); + // update the schema used to build writers + if (message instanceof MetadataUpdateControlMessage) { + PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage) message) + .getGlobalMetadata().getSchema()); + } + for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) { ControlMessage cloned = (ControlMessage) cloner.getClone(); writer.getMessageHandler().handleMessage(cloned); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java index 6006572..ef435b1 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java @@ -27,6 +27,7 @@ import org.testng.annotations.Test; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.records.ControlMessageHandler; +import org.apache.gobblin.records.FlushControlMessageHandler; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.stream.FlushControlMessage; import org.apache.gobblin.stream.RecordEnvelope; @@ -43,86 +44,85 @@ public class CloseOnFlushWriterWrapperTest { byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); + + writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build()); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); Assert.assertEquals(dummyWriters.get(0).closeCount, 0); Assert.assertFalse(dummyWriters.get(0).committed); - Assert.assertTrue(dummyWriters.get(0).handlerCalled); + Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1); } @Test public void testCloseOnFlushEnabled() throws IOException { WorkUnitState state = new WorkUnitState(); - state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true"); + state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true"); List<DummyWriter> dummyWriters = new ArrayList<>(); CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); + writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build()); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); Assert.assertEquals(dummyWriters.get(0).closeCount, 1); Assert.assertTrue(dummyWriters.get(0).committed); - // handler from CloseOnFlushWriterWrapper should have been called instead - Assert.assertFalse(dummyWriters.get(0).handlerCalled); + Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1); } @Test public void testWriteAfterFlush() throws IOException { WorkUnitState state = new WorkUnitState(); - state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true"); + state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true"); List<DummyWriter> dummyWriters = new ArrayList<>(); CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); + writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build()); Assert.assertEquals(dummyWriters.size(), 1); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); Assert.assertEquals(dummyWriters.get(0).closeCount, 1); Assert.assertTrue(dummyWriters.get(0).committed); - Assert.assertFalse(dummyWriters.get(0).handlerCalled); + Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1); writer.writeEnvelope(new RecordEnvelope(record)); - writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); + writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build()); Assert.assertEquals(dummyWriters.size(), 2); Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(1).flushCount, 1); Assert.assertEquals(dummyWriters.get(1).closeCount, 1); Assert.assertTrue(dummyWriters.get(1).committed); - Assert.assertFalse(dummyWriters.get(1).handlerCalled); + Assert.assertEquals(dummyWriters.get(1).handlerCalled, 1); } @Test public void testCloseAfterFlush() throws IOException { WorkUnitState state = new WorkUnitState(); - state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true"); + state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true"); List<DummyWriter> dummyWriters = new ArrayList<>(); CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); + writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build()); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); Assert.assertEquals(dummyWriters.get(0).closeCount, 1); Assert.assertTrue(dummyWriters.get(0).committed); - // handler from CloseOnFlushWriterWrapper should have been called instead - Assert.assertFalse(dummyWriters.get(0).handlerCalled); + Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1); writer.close(); @@ -148,7 +148,7 @@ public class CloseOnFlushWriterWrapperTest { private int flushCount = 0; private int closeCount = 0; private boolean committed = false; - private boolean handlerCalled = false; + private int handlerCalled = 0; DummyWriter() { } @@ -190,10 +190,10 @@ public class CloseOnFlushWriterWrapperTest { @Override public ControlMessageHandler getMessageHandler() { - return new ControlMessageHandler() { + return new FlushControlMessageHandler(this) { @Override public void handleMessage(ControlMessage message) { - handlerCalled = true; + handlerCalled++; if (message instanceof FlushControlMessage) { flush(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java index c00c823..0dc9846 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java @@ -184,7 +184,7 @@ public class PartitionedWriterTest { String record2 = "123"; writer.writeEnvelope(new RecordEnvelope(record2)); - FlushControlMessage controlMessage = new FlushControlMessage<>(new FlushControlMessage.FlushReason("test")); + FlushControlMessage controlMessage = FlushControlMessage.builder().build(); BasicAckableForTesting ackable = new BasicAckableForTesting(); controlMessage.addCallBack(ackable); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java index d00938e..620e401 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java @@ -39,7 +39,6 @@ import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.Converter; import org.apache.gobblin.converter.DataConversionException; import org.apache.gobblin.converter.SchemaConversionException; -import org.apache.gobblin.converter.SingleRecordIterable; import org.apache.gobblin.fork.IdentityForkOperator; import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.publisher.TaskPublisher; @@ -47,6 +46,7 @@ import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker; import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults; import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker; import org.apache.gobblin.records.ControlMessageHandler; +import org.apache.gobblin.records.FlushControlMessageHandler; import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.source.extractor.Extractor; @@ -97,8 +97,8 @@ public class TestRecordStream { @Test public void testFlushControlMessages() throws Exception { MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"), - new FlushControlMessage(new FlushControlMessage.FlushReason("flush1")), new RecordEnvelope<>("b"), - new FlushControlMessage(new FlushControlMessage.FlushReason("flush2"))}); + FlushControlMessage.builder().flushReason("flush1").build(), new RecordEnvelope<>("b"), + FlushControlMessage.builder().flushReason("flush2").build()}); MyConverter converter = new MyConverter(); MyFlushDataWriter writer = new MyFlushDataWriter(); @@ -109,11 +109,12 @@ public class TestRecordStream { Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); Assert.assertEquals(converter.records, Lists.newArrayList("a", "b")); - Assert.assertEquals(converter.messages, Lists.newArrayList(new FlushControlMessage(new FlushControlMessage.FlushReason("flush1")), - new FlushControlMessage(new FlushControlMessage.FlushReason("flush2")))); + Assert.assertEquals(converter.messages, Lists.newArrayList( + FlushControlMessage.builder().flushReason("flush1").build(), + FlushControlMessage.builder().flushReason("flush2").build())); Assert.assertEquals(writer.records, Lists.newArrayList("a", "b")); - Assert.assertEquals(writer.messages, Lists.newArrayList("flush called", "flush called")); + Assert.assertEquals(writer.flush_messages, Lists.newArrayList("flush called", "flush called")); } /** @@ -187,7 +188,7 @@ public class TestRecordStream { new RecordEnvelope<>("schema:b"), new RecordEnvelope<>("schema1:c"), new RecordEnvelope<>("schema2:d")}); SchemaChangeDetectionInjector injector = new SchemaChangeDetectionInjector(); SchemaAppendConverter converter = new SchemaAppendConverter(); - MyDataWriter writer = new MyDataWriter(); + MyDataWriter writer = new MyDataWriterWithSchemaCheck(); Task task = setupTask(extractor, writer, Collections.EMPTY_LIST, Lists.newArrayList(injector, converter)); @@ -351,8 +352,9 @@ public class TestRecordStream { } static class MyDataWriter extends DataWriterBuilder<String, String> implements DataWriter<String> { - private List<String> records = new ArrayList<>(); - private List<ControlMessage<String>> messages = new ArrayList<>(); + protected List<String> records = new ArrayList<>(); + protected List<ControlMessage<String>> messages = new ArrayList<>(); + protected String writerSchema; @Override public void write(String record) throws IOException { @@ -382,6 +384,7 @@ public class TestRecordStream { @Override public DataWriter<String> build() throws IOException { + this.writerSchema = this.schema; return this; } @@ -467,7 +470,7 @@ public class TestRecordStream { String recordSchema = inputRecordEnvelope.getRecord().split(":")[0]; if (!recordSchema.equals(this.globalMetadata.getSchema())) { - return new SingleRecordIterable<>(new MetadataUpdateControlMessage<>( + return Lists.newArrayList(new MetadataUpdateControlMessage<>( GlobalMetadata.<String>builder().schema(recordSchema).build())); } @@ -486,43 +489,26 @@ public class TestRecordStream { } } - static class MyFlushDataWriter extends DataWriterBuilder<String, String> implements DataWriter<String> { - private List<String> records = new ArrayList<>(); - private List<String> messages = new ArrayList<>(); - - @Override - public void write(String record) throws IOException { - this.records.add(record); - } - - @Override - public void commit() throws IOException {} - - @Override - public void cleanup() throws IOException {} - - @Override - public long recordsWritten() { - return 0; - } + static class MyFlushDataWriter extends MyDataWriter { + private List<String> flush_messages = new ArrayList<>(); @Override - public long bytesWritten() throws IOException { - return 0; + public ControlMessageHandler getMessageHandler() { + return new FlushControlMessageHandler(this); } @Override - public DataWriter<String> build() throws IOException { - return this; + public void flush() throws IOException { + flush_messages.add("flush called"); } + } + static class MyDataWriterWithSchemaCheck extends MyDataWriter { @Override - public void close() throws IOException {} + public void write(String record) throws IOException { + super.write(record); - @Override - public void flush() throws IOException { - messages.add("flush called"); + Assert.assertEquals(this.writerSchema, record.split(":")[1]); } } - }
