Repository: incubator-gobblin Updated Branches: refs/heads/master 252300e99 -> 11646563a
[GOBBLIN-171] Add a writer wrapper that closes the wrapped writer and creates a new one Closes #2027 from htran1/close_on_flush_writer Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11646563 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11646563 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11646563 Branch: refs/heads/master Commit: 11646563a7cb17931b68d7ad124727e6df4e5bdb Parents: 252300e Author: Hung Tran <[email protected]> Authored: Sat Jul 29 13:50:46 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Sat Jul 29 13:50:46 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 3 + .../writer/InstrumentedDataWriterDecorator.java | 5 +- .../writer/CloseOnFlushWriterWrapper.java | 146 +++++++++++++++++ .../gobblin/writer/PartitionedDataWriter.java | 33 +++- .../writer/CloseOnFlushWriterWrapperTest.java | 160 +++++++++++++++++++ 5 files changed, 342 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java index bb54b5d..5bb8460 100644 --- a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java @@ -344,6 +344,9 @@ public class ConfigurationKeys { public static final String WRITER_BYTES_WRITTEN = WRITER_PREFIX + ".bytes.written"; public static final String WRITER_EARLIEST_TIMESTAMP = WRITER_PREFIX + ".earliest.timestamp"; public static final String WRITER_AVERAGE_TIMESTAMP = WRITER_PREFIX + ".average.timestamp"; + // Used internally to enable closing of the writer on flush + public static final String WRITER_CLOSE_ON_FLUSH_KEY = WRITER_PREFIX + ".closeOnFlush"; + public static final boolean DEFAULT_WRITER_CLOSE_ON_FLUSH = false; /** * Configuration properties used by the quality checker. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java index 5bd7b8a..3c63a8b 100644 --- a/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java +++ b/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java @@ -50,8 +50,9 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa super(state, Optional.<Class<?>> of(DecoratorUtils.resolveUnderlyingObject(writer).getClass())); this.embeddedWriter = this.closer.register(writer); this.isEmbeddedInstrumented = Instrumented.isLineageInstrumented(writer); - if (this.embeddedWriter instanceof WatermarkAwareWriter) { - this.watermarkAwareWriter = Optional.of((WatermarkAwareWriter) this.embeddedWriter); + Object underlying = DecoratorUtils.resolveUnderlyingObject(embeddedWriter); + if (underlying instanceof WatermarkAwareWriter) { + this.watermarkAwareWriter = Optional.of((WatermarkAwareWriter) underlying); } else { this.watermarkAwareWriter = Optional.absent(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java new file mode 100644 index 0000000..746140a --- /dev/null +++ b/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.writer; + +import java.io.IOException; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.rholder.retry.RetryerBuilder; +import com.google.common.base.Preconditions; + +import gobblin.configuration.ConfigurationKeys; +import gobblin.configuration.State; +import gobblin.records.ControlMessageHandler; +import gobblin.stream.RecordEnvelope; +import gobblin.util.Decorator; +import gobblin.util.FinalState; + +/** + * The {@link CloseOnFlushWriterWrapper} closes the wrapped writer on flush and creates a new writer using a + * {@link Supplier} on the next write. After the writer is closed the reference is still available for inspection until + * a new writer is created on the next write. + * @param <D> + */ +public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements Decorator, FinalState, Retriable { + private static final Logger LOG = LoggerFactory.getLogger(CloseOnFlushWriterWrapper.class); + + private final State state; + private DataWriter<D> writer; + private final Supplier<DataWriter<D>> writerSupplier; + private boolean closed; + // is the close functionality enabled? + private final boolean closeOnFlush; + + public CloseOnFlushWriterWrapper(Supplier<DataWriter<D>> writerSupplier, State state) { + Preconditions.checkNotNull(state, "State is required."); + + this.state = state; + this.writerSupplier = writerSupplier; + + this.writer = writerSupplier.get(); + this.closed = false; + + this.closeOnFlush = this.state.getPropAsBoolean(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, + ConfigurationKeys.DEFAULT_WRITER_CLOSE_ON_FLUSH); + } + + @Override + public Object getDecoratedObject() { + return this.writer; + } + + @Override + public void writeEnvelope(RecordEnvelope<D> record) throws IOException { + // get a new writer if last one was closed + if (this.closed) { + this.writer = writerSupplier.get(); + this.closed = false; + } + this.writer.writeEnvelope(record); + } + + @Override + public void close() throws IOException { + writer.close(); + this.closed = true; + } + + @Override + public void commit() throws IOException { + writer.commit(); + } + + @Override + public void cleanup() throws IOException { + writer.cleanup(); + + } + + @Override + public long recordsWritten() { + return writer.recordsWritten(); + } + + @Override + public long bytesWritten() throws IOException { + return writer.bytesWritten(); + } + + @Override + public RetryerBuilder<Void> getRetryerBuilder() { + if (writer instanceof Retriable) { + return ((Retriable) writer).getRetryerBuilder(); + } + return RetryWriter.createRetryBuilder(state); + } + + @Override + public State getFinalState() { + State state = new State(); + + if (this.writer instanceof FinalState) { + state.addAll(((FinalState)this.writer).getFinalState()); + } else { + LOG.warn("Wrapped writer does not implement FinalState: " + this.writer.getClass()); + } + + return state; + } + + @Override + public ControlMessageHandler getMessageHandler() { + return this.writer.getMessageHandler(); + } + + /** + * The writer will be flushed. It will also be committed and closed if configured to be closed on flush. + * @throws IOException + */ + @Override + public void flush() throws IOException { + this.writer.flush(); + + // commit data then close the writer + if (this.closeOnFlush) { + commit(); + close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java index 2cc8169..f5cb017 100644 --- a/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java @@ -20,6 +20,7 @@ package gobblin.writer; import java.io.IOException; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; @@ -81,8 +82,19 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin @Override public DataWriter<D> load(final GenericRecord key) throws Exception { + /* wrap the data writer to allow the option to close the writer on flush */ return PartitionedDataWriter.this.closer - .register(new InstrumentedPartitionedDataWriterDecorator<>(createPartitionWriter(key), state, key)); + .register(new InstrumentedPartitionedDataWriterDecorator<>( + new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() { + @Override + public DataWriter<D> get() { + try { + return createPartitionWriter(key); + } catch (IOException e) { + throw new RuntimeException("Error creating writer", e); + } + } + }, state), state, key)); } }); @@ -106,9 +118,24 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin } } else { this.shouldPartition = false; - DataWriter<D> dataWriter = builder.build(); + // Support configuration to close the DataWriter on flush to allow publishing intermediate results in a task + CloseOnFlushWriterWrapper closeOnFlushWriterWrapper = + new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() { + @Override + public DataWriter<D> get() { + try { + return builder.withWriterId(PartitionedDataWriter.this.baseWriterId + "_" + + PartitionedDataWriter.this.writerIdSuffix++).build(); + } catch (IOException e) { + throw new RuntimeException("Error creating writer", e); + } + } + }, state); + DataWriter<D> dataWriter = (DataWriter)closeOnFlushWriterWrapper.getDecoratedObject(); + InstrumentedDataWriterDecorator<D> writer = - this.closer.register(new InstrumentedDataWriterDecorator<>(dataWriter, state)); + this.closer.register(new InstrumentedDataWriterDecorator<>(closeOnFlushWriterWrapper, state)); + this.isSpeculativeAttemptSafe = this.isDataWriterForPartitionSafe(dataWriter); this.isWatermarkCapable = this.isDataWriterWatermarkCapable(dataWriter); this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, writer); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java new file mode 100644 index 0000000..84a81ec --- /dev/null +++ b/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.writer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import gobblin.configuration.ConfigurationKeys; +import gobblin.configuration.WorkUnitState; +import gobblin.stream.RecordEnvelope; + +public class CloseOnFlushWriterWrapperTest { + + @Test + public void testCloseOnFlushDisabled() + throws IOException { + WorkUnitState state = new WorkUnitState(); + List<DummyWriter> dummyWriters = new ArrayList<>(); + CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); + + byte[] record = new byte[]{'a', 'b', 'c', 'd'}; + + writer.writeEnvelope(new RecordEnvelope(record)); + writer.flush(); + + Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); + Assert.assertEquals(dummyWriters.get(0).flushCount, 1); + Assert.assertEquals(dummyWriters.get(0).closed, false); + Assert.assertEquals(dummyWriters.get(0).committed, false); + } + + @Test + public void testCloseOnFlushEnabled() + throws IOException { + WorkUnitState state = new WorkUnitState(); + state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true"); + List<DummyWriter> dummyWriters = new ArrayList<>(); + CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); + + byte[] record = new byte[]{'a', 'b', 'c', 'd'}; + + writer.writeEnvelope(new RecordEnvelope(record)); + writer.flush(); + + Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); + Assert.assertEquals(dummyWriters.get(0).flushCount, 1); + Assert.assertEquals(dummyWriters.get(0).closed, true); + Assert.assertEquals(dummyWriters.get(0).committed, true); + } + + @Test + public void testWriteAfterFlush() + throws IOException { + WorkUnitState state = new WorkUnitState(); + state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true"); + List<DummyWriter> dummyWriters = new ArrayList<>(); + CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); + + byte[] record = new byte[]{'a', 'b', 'c', 'd'}; + + writer.writeEnvelope(new RecordEnvelope(record)); + writer.flush(); + + Assert.assertEquals(dummyWriters.size(), 1); + Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); + Assert.assertEquals(dummyWriters.get(0).flushCount, 1); + Assert.assertEquals(dummyWriters.get(0).closed, true); + Assert.assertEquals(dummyWriters.get(0).committed, true); + + writer.writeEnvelope(new RecordEnvelope(record)); + writer.flush(); + + Assert.assertEquals(dummyWriters.size(), 2); + Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1); + Assert.assertEquals(dummyWriters.get(1).flushCount, 1); + Assert.assertEquals(dummyWriters.get(1).closed, true); + Assert.assertEquals(dummyWriters.get(1).committed, true); + } + + private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) { + return new CloseOnFlushWriterWrapper<>(new Supplier<DataWriter<byte[]>>() { + @Override + public DataWriter<byte[]> get() { + DummyWriter writer = new DummyWriter(); + dummyWriters.add(writer); + return writer; + } + }, state.getJobState()); + } + + private static class DummyWriter implements DataWriter<byte[]> { + private int recordsSeen = 0; + private byte[] lastWrittenRecord; + private int flushCount = 0; + private boolean committed = false; + private boolean closed = false; + + DummyWriter() { + } + + @Override + public void write(byte[] record) + throws IOException { + this.recordsSeen++; + this.lastWrittenRecord = record; + } + + @Override + public void commit() + throws IOException { + this.committed = true; + } + + @Override + public void cleanup() + throws IOException { + } + + @Override + public long recordsWritten() { + return this.recordsSeen; + } + + @Override + public long bytesWritten() + throws IOException { + return 0; + } + + @Override + public void close() + throws IOException { + this.closed = true; + } + + @Override + public void flush() { + this.flushCount++; + } + } +}
