Repository: incubator-gobblin Updated Branches: refs/heads/master 14c1b03bb -> 3a035737f
[GOBBLIN-236] Add a ControlMessage injector as a RecordStreamProcessor Closes #2090 from htran1/control_message_injection Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3a035737 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3a035737 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3a035737 Branch: refs/heads/master Commit: 3a035737f72ce2bffbbe58beac2044b3b46da992 Parents: 14c1b03 Author: Hung Tran <[email protected]> Authored: Thu Sep 14 12:54:23 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Sep 14 12:54:23 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 1 + .../org/apache/gobblin/converter/Converter.java | 29 ++- .../java/org/apache/gobblin/fork/Forker.java | 6 +- .../apache/gobblin/metadata/GlobalMetadata.java | 69 +++++++ .../records/RecordStreamWithMetadata.java | 20 +- .../gobblin/source/extractor/Extractor.java | 3 +- .../gobblin/stream/ControlMessageInjector.java | 143 ++++++++++++++ .../stream/MetadataUpdateControlMessage.java | 41 ++++ .../apache/gobblin/converter/ConverterTest.java | 13 +- .../org/apache/gobblin/fork/ForkerTest.java | 4 +- .../gobblin/converter/AsyncConverter1to1.java | 16 +- .../extractor/InstrumentedExtractorBase.java | 4 +- .../converter/AsyncConverter1to1Test.java | 7 +- .../gobblin/runtime/StreamModelTaskRunner.java | 23 ++- .../java/org/apache/gobblin/runtime/Task.java | 50 +++-- .../org/apache/gobblin/runtime/TaskContext.java | 57 ++++++ .../gobblin/runtime/TestRecordStream.java | 188 ++++++++++++++++++- 17 files changed, 628 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index c986fd6..32d4729 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -147,6 +147,7 @@ public class ConfigurationKeys { public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir"; public static final String SOURCE_CLASS_KEY = "source.class"; public static final String CONVERTER_CLASSES_KEY = "converter.classes"; + public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY = "recordStreamProcessor.classes"; public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class"; public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator"; public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java index 82cf10e..b4a45b7 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java @@ -25,15 +25,19 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.initializer.ConverterInitializer; import org.apache.gobblin.converter.initializer.NoopConverterInitializer; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.records.RecordStreamWithMetadata; +import org.apache.gobblin.stream.MetadataUpdateControlMessage; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.source.workunit.WorkUnitStream; import org.apache.gobblin.stream.StreamEntity; import org.apache.gobblin.util.FinalState; +import com.google.common.base.Optional; + import io.reactivex.Flowable; @@ -55,6 +59,9 @@ import io.reactivex.Flowable; * @param <DO> output data type */ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState, RecordStreamProcessor<SI, SO, DI, DO> { + // Metadata containing the output schema. This may be changed when a MetadataUpdateControlMessage is received. + private GlobalMetadata<SO> outputGlobalMetadata; + /** * Initialize this {@link Converter}. * @@ -120,16 +127,30 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> inputStream, WorkUnitState workUnitState) throws SchemaConversionException { init(workUnitState); - SO outputSchema = convertSchema(inputStream.getSchema(), workUnitState); + this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(), + Optional.of(convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState))).build(); Flowable<StreamEntity<DO>> outputStream = inputStream.getRecordStream() .flatMap(in -> { if (in instanceof ControlMessage) { + ControlMessage out = (ControlMessage) in; + getMessageHandler().handleMessage((ControlMessage) in); - return Flowable.just(((ControlMessage<DO>) in)); + + // update the output schema with the new input schema from the MetadataUpdateControlMessage + if (in instanceof MetadataUpdateControlMessage) { + this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput( + ((MetadataUpdateControlMessage) in).getGlobalMetadata(), + Optional.of(convertSchema((SI)((MetadataUpdateControlMessage) in).getGlobalMetadata() + .getSchema(), workUnitState))).build(); + out = new MetadataUpdateControlMessage<SO, DO>(this.outputGlobalMetadata); + } + + return Flowable.just(((ControlMessage<DO>) out)); } else if (in instanceof RecordEnvelope) { RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in; - Iterator<DO> convertedIterable = convertRecord(outputSchema, recordEnvelope.getRecord(), workUnitState).iterator(); + Iterator<DO> convertedIterable = convertRecord(this.outputGlobalMetadata.getSchema(), + recordEnvelope.getRecord(), workUnitState).iterator(); if (!convertedIterable.hasNext()) { // if the iterable is empty, ack the record, return an empty flowable @@ -153,7 +174,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState } }, 1); outputStream = outputStream.doOnComplete(this::close); - return inputStream.withRecordStream(outputStream, outputSchema); + return inputStream.withRecordStream(outputStream, this.outputGlobalMetadata); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java b/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java index d70e5ba..d077f78 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.stream.RecordEnvelope; @@ -59,7 +60,7 @@ public class Forker { workUnitState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, branches); forkOperator.init(workUnitState); - List<Boolean> forkedSchemas = forkOperator.forkSchema(workUnitState, inputStream.getSchema()); + List<Boolean> forkedSchemas = forkOperator.forkSchema(workUnitState, inputStream.getGlobalMetadata().getSchema()); int activeForks = (int) forkedSchemas.stream().filter(b -> b).count(); Preconditions.checkState(forkedSchemas.size() == branches, String @@ -90,7 +91,8 @@ public class Forker { Flowable<StreamEntity<D>> thisStream = forkedStream.filter(new ForkFilter<>(idx)).map(RecordWithForkMap::getRecordCopyIfNecessary); forkStreams.add(inputStream.withRecordStream(thisStream, - mustCopy ? (S) CopyHelper.copy(inputStream.getSchema()) : inputStream.getSchema())); + mustCopy ? (GlobalMetadata<S>) CopyHelper.copy(inputStream.getGlobalMetadata()) : + inputStream.getGlobalMetadata())); } else { forkStreams.add(null); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java b/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java new file mode 100644 index 0000000..48a6943 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metadata; + +import org.apache.gobblin.fork.CopyHelper; +import org.apache.gobblin.fork.CopyNotSupportedException; +import org.apache.gobblin.fork.Copyable; + +import com.google.common.base.Optional; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; + + +/** + * Global metadata + * @param <S> schema type + */ +@AllArgsConstructor(access=AccessLevel.PRIVATE) +@EqualsAndHashCode +@Builder +public class GlobalMetadata<S> implements Copyable<GlobalMetadata<S>> { + @Getter + private S schema; + + @Override + public GlobalMetadata<S> copy() throws CopyNotSupportedException { + if (CopyHelper.isCopyable(schema)) { + return new GlobalMetadata((S)CopyHelper.copy(schema)); + } + + throw new CopyNotSupportedException("Type is not copyable: " + schema.getClass().getName()); + } + + /** + * Builder that takes in an input {@GlobalMetadata} to use as a base. + * @param inputMetadata input metadata + * @param outputSchema output schema to set in the builder + * @param <SI> input schema type + * @param <SO> output schema type + * @return builder + */ + public static <SI, SO> GlobalMetadataBuilder<SO> builderWithInput(GlobalMetadata<SI> inputMetadata, Optional<SO> outputSchema) { + GlobalMetadataBuilder<SO> builder = (GlobalMetadataBuilder<SO>) builder(); + + if (outputSchema.isPresent()) { + builder.schema(outputSchema.get()); + } + + return builder; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java b/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java index 57c1e7c..c5dda8f 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java @@ -19,6 +19,7 @@ package org.apache.gobblin.records; import java.util.function.Function; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.stream.StreamEntity; @@ -34,20 +35,29 @@ import lombok.Data; @Data public class RecordStreamWithMetadata<D, S> { private final Flowable<StreamEntity<D>> recordStream; - private final S schema; + private final GlobalMetadata<S> globalMetadata; /** * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} but same schema. */ public <DO> RecordStreamWithMetadata<DO, S> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream) { - return withRecordStream(newRecordStream, this.schema); + return withRecordStream(newRecordStream, this.globalMetadata); } /** - * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #schema}. + * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #globalMetadata}. */ + @Deprecated public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream, SO newSchema) { - return new RecordStreamWithMetadata<>(newRecordStream, newSchema); + return new RecordStreamWithMetadata<>(newRecordStream, GlobalMetadata.<SO>builder().schema(newSchema).build()); + } + + /** + * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #globalMetadata}. + */ + public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream, + GlobalMetadata<SO> newGlobalMetadata) { + return new RecordStreamWithMetadata<>(newRecordStream, newGlobalMetadata); } /** @@ -56,7 +66,7 @@ public class RecordStreamWithMetadata<D, S> { */ public <DO> RecordStreamWithMetadata<DO, S> mapStream(Function<? super Flowable<StreamEntity<D>>, ? extends Flowable<StreamEntity<DO>>> transform) { - return new RecordStreamWithMetadata<>(transform.apply(this.recordStream), this.schema); + return new RecordStreamWithMetadata<>(transform.apply(this.recordStream), this.globalMetadata); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java index 22b1670..9749795 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.stream.StreamEntity; @@ -128,7 +129,7 @@ public interface Extractor<S, D> extends Closeable { } }); recordStream = recordStream.doFinally(this::close); - return new RecordStreamWithMetadata<>(recordStream, schema); + return new RecordStreamWithMetadata<>(recordStream, GlobalMetadata.<S>builder().schema(schema).build()); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java new file mode 100644 index 0000000..93ac0ae --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.stream; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.GlobalMetadata; +import org.apache.gobblin.records.ControlMessageHandler; +import org.apache.gobblin.records.RecordStreamProcessor; +import org.apache.gobblin.records.RecordStreamWithMetadata; + +import io.reactivex.Flowable; + +/** + * A {@link RecordStreamProcessor} that inspects an input record and outputs control messages before, after, or around + * the input record + * @param <SI> + * @param <DI> + */ +public abstract class ControlMessageInjector<SI, DI> implements Closeable, + RecordStreamProcessor<SI, SI, DI, DI> { + + /** + * Initialize this {@link ControlMessageInjector}. + * + * @param workUnitState a {@link WorkUnitState} object carrying configuration properties + * @return an initialized {@link ControlMessageInjector} instance + */ + protected ControlMessageInjector<SI, DI> init(WorkUnitState workUnitState) { + return this; + } + + @Override + public void close() throws IOException { + } + + /** + * Set the global metadata of the input messages. The base implementation is empty and should be overridden by + * the subclasses that need to store the input {@link GlobalMetadata} + * @param inputGlobalMetadata the global metadata for input messages + * @param workUnitState + */ + protected void setInputGlobalMetadata(GlobalMetadata<SI> inputGlobalMetadata, WorkUnitState workUnitState) { + } + + /** + * Inject {@link ControlMessage}s before the record + * @param inputRecordEnvelope + * @param workUnitState + * @return The {@link ControlMessage}s to inject before the record + */ + protected abstract Iterable<ControlMessage<DI>> injectControlMessagesBefore(RecordEnvelope<DI> inputRecordEnvelope, + WorkUnitState workUnitState); + + /** + * Inject {@link ControlMessage}s after the record + * @param inputRecordEnvelope + * @param workUnitState + * @return The {@link ControlMessage}s to inject after the record + */ + protected abstract Iterable<ControlMessage<DI>> injectControlMessagesAfter(RecordEnvelope<DI> inputRecordEnvelope, + WorkUnitState workUnitState); + + /** + * Apply injections to the input {@link RecordStreamWithMetadata}. + * {@link ControlMessage}s may be injected before, after, or around the input record. + * A {@link MetadataUpdateControlMessage} will update the current input {@link GlobalMetadata} and pass the + * updated input {@link GlobalMetadata} to the next processor to propagate the metadata update down the pipeline. + */ + @Override + public RecordStreamWithMetadata<DI, SI> processStream(RecordStreamWithMetadata<DI, SI> inputStream, + WorkUnitState workUnitState) throws StreamProcessingException { + init(workUnitState); + + setInputGlobalMetadata(inputStream.getGlobalMetadata(), workUnitState); + + Flowable<StreamEntity<DI>> outputStream = + inputStream.getRecordStream() + .flatMap(in -> { + if (in instanceof ControlMessage) { + if (in instanceof MetadataUpdateControlMessage) { + setInputGlobalMetadata(((MetadataUpdateControlMessage) in).getGlobalMetadata(), workUnitState); + } + + getMessageHandler().handleMessage((ControlMessage) in); + return Flowable.just(in); + } else if (in instanceof RecordEnvelope) { + RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in; + Iterable<ControlMessage<DI>> injectedBeforeIterable = + injectControlMessagesBefore(recordEnvelope, workUnitState); + Iterable<ControlMessage<DI>> injectedAfterIterable = + injectControlMessagesAfter(recordEnvelope, workUnitState); + + if (injectedBeforeIterable == null && injectedAfterIterable == null) { + // nothing injected so return the record envelope + return Flowable.just(recordEnvelope); + } else { + Flowable<StreamEntity<DI>> flowable; + + if (injectedBeforeIterable != null) { + flowable = Flowable.<StreamEntity<DI>>fromIterable(injectedBeforeIterable) + .concatWith(Flowable.just(recordEnvelope)); + } else { + flowable = Flowable.just(recordEnvelope); + } + + if (injectedAfterIterable != null) { + flowable.concatWith(Flowable.fromIterable(injectedAfterIterable)); + } + return flowable; + } + } else { + throw new UnsupportedOperationException(); + } + }, 1); + outputStream = outputStream.doOnComplete(this::close); + return inputStream.withRecordStream(outputStream, inputStream.getGlobalMetadata()); + } + + /** + * @return {@link ControlMessageHandler} to call for each {@link ControlMessage} received. + */ + protected ControlMessageHandler getMessageHandler() { + return ControlMessageHandler.NOOP; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java new file mode 100644 index 0000000..7edd991 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.stream; + +import org.apache.gobblin.metadata.GlobalMetadata; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; + + +/** + * Control message for updating the {@link GlobalMetadata} used for processing records + * @param <S> schema type + * @param <D> data type + */ +@AllArgsConstructor +@EqualsAndHashCode +public class MetadataUpdateControlMessage<S, D> extends ControlMessage<D> { + @Getter + private GlobalMetadata<S> globalMetadata; + + @Override + protected StreamEntity<D> buildClone() { + return new MetadataUpdateControlMessage(this.globalMetadata); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java index f5f0ed5..00ac4f9 100644 --- a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java +++ b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import org.apache.gobblin.ack.BasicAckableForTesting; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.stream.RecordEnvelope; @@ -42,7 +43,8 @@ public class ConverterTest { BasicAckableForTesting ackable = new BasicAckableForTesting(); RecordStreamWithMetadata<Integer, String> stream = - new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(0)), "schema").mapRecords(r -> { + new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(0)), + GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> { r.addCallBack(ackable); return r; }); @@ -60,7 +62,8 @@ public class ConverterTest { BasicAckableForTesting ackable = new BasicAckableForTesting(); RecordStreamWithMetadata<Integer, String> stream = - new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1)), "schema").mapRecords(r -> { + new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1)), + GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> { r.addCallBack(ackable); return r; }); @@ -81,7 +84,8 @@ public class ConverterTest { BasicAckableForTesting ackable = new BasicAckableForTesting(); RecordStreamWithMetadata<Integer, String> stream = - new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(2)), "schema").mapRecords(r -> { + new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(2)), + GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> { r.addCallBack(ackable); return r; }); @@ -105,7 +109,8 @@ public class ConverterTest { BasicAckableForTesting ackable = new BasicAckableForTesting(); RecordStreamWithMetadata<Integer, String> stream = - new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1), new MyControlMessage<>()), "schema").mapRecords(r -> { + new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1), new MyControlMessage<>()), + GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> { r.addCallBack(ackable); return r; }); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java b/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java index 1c8beb1..30125f3 100644 --- a/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java +++ b/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java @@ -32,6 +32,7 @@ import com.google.common.collect.Lists; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.runtime.BasicTestControlMessage; import org.apache.gobblin.stream.RecordEnvelope; @@ -48,7 +49,8 @@ public class ForkerTest { Forker forker = new Forker(); MyFlowable<StreamEntity<byte[]>> flowable = new MyFlowable<>(); - RecordStreamWithMetadata<byte[], String> stream = new RecordStreamWithMetadata<>(flowable, "schema"); + RecordStreamWithMetadata<byte[], String> stream = + new RecordStreamWithMetadata<>(flowable, GlobalMetadata.<String>builder().schema("schema").build()); WorkUnitState workUnitState = new WorkUnitState(); workUnitState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, "3"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java index 7eb1563..b5092ef 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java @@ -19,9 +19,12 @@ package org.apache.gobblin.converter; import java.util.concurrent.CompletableFuture; +import com.google.common.base.Optional; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.stream.RecordEnvelope; @@ -64,12 +67,20 @@ public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI, S protected abstract CompletableFuture<DO> convertRecordAsync(SO outputSchema, DI inputRecord, WorkUnitState workUnit) throws DataConversionException; + /** + * Return a {@link RecordStreamWithMetadata} with the appropriate modifications. + * @param inputStream + * @param workUnitState + * @return + * @throws SchemaConversionException + * @implNote this processStream does not handle {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}s + */ @Override public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> inputStream, WorkUnitState workUnitState) throws SchemaConversionException { int maxConcurrentAsyncConversions = workUnitState.getPropAsInt(MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY, DEFAULT_MAX_CONCURRENT_ASYNC_CONVERSIONS); - SO outputSchema = convertSchema(inputStream.getSchema(), workUnitState); + SO outputSchema = convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState); Flowable<StreamEntity<DO>> outputStream = inputStream.getRecordStream() .flatMapSingle(in -> { @@ -83,7 +94,8 @@ public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI, S throw new IllegalStateException("Expected ControlMessage or RecordEnvelope."); } }, false, maxConcurrentAsyncConversions); - return inputStream.withRecordStream(outputStream, outputSchema); + return inputStream.withRecordStream(outputStream, GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(), + Optional.of(outputSchema)).build()); } @RequiredArgsConstructor http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java index 35de712..1dcb1d1 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java @@ -34,6 +34,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.MetricNames; @@ -190,7 +191,8 @@ public abstract class InstrumentedExtractorBase<S, D> } }); recordStream = recordStream.doFinally(this::close); - return new RecordStreamWithMetadata<>(recordStream, schema); + return new RecordStreamWithMetadata<>(recordStream, GlobalMetadata.<S>builder().schema(schema).build()); + } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java index 01ec3b4..77ae3ab 100644 --- a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java +++ b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java @@ -31,6 +31,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.util.ExponentialBackoff; @@ -52,7 +53,8 @@ public class AsyncConverter1to1Test { workUnitState.setProp(AsyncConverter1to1.MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY, 3); RecordStreamWithMetadata<String, String> stream = - new RecordStreamWithMetadata<>(Flowable.range(0, 5).map(i -> i.toString()).map(RecordEnvelope::new), "schema"); + new RecordStreamWithMetadata<>(Flowable.range(0, 5).map(i -> i.toString()).map(RecordEnvelope::new), + GlobalMetadata.<String>builder().schema("schema").build()); Set<String> outputRecords = Sets.newConcurrentHashSet(); @@ -106,7 +108,8 @@ public class AsyncConverter1to1Test { workUnitState.setProp(AsyncConverter1to1.MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY, 3); RecordStreamWithMetadata<String, String> stream = - new RecordStreamWithMetadata<>(Flowable.just("0", MyAsyncConverter1to1.FAIL, "1").map(RecordEnvelope::new), "schema"); + new RecordStreamWithMetadata<>(Flowable.just("0", MyAsyncConverter1to1.FAIL, "1").map(RecordEnvelope::new), + GlobalMetadata.<String>builder().schema("schema").build()); Set<String> outputRecords = Sets.newConcurrentHashSet(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java index 2b0b47b..7713b79 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime; +import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -32,6 +33,7 @@ import org.apache.gobblin.converter.Converter; import org.apache.gobblin.fork.ForkOperator; import org.apache.gobblin.fork.Forker; import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker; +import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.runtime.fork.Fork; import org.apache.gobblin.source.extractor.Extractor; @@ -61,6 +63,7 @@ public class StreamModelTaskRunner { private final TaskContext taskContext; private final Extractor extractor; private final Converter converter; + private final List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors; private final RowLevelPolicyChecker rowChecker; private final TaskExecutor taskExecutor; private final ExecutionModel taskMode; @@ -102,13 +105,21 @@ public class StreamModelTaskRunner { return r; }); } - if (this.converter instanceof MultiConverter) { - // if multiconverter, unpack it - for (Converter cverter : ((MultiConverter) this.converter).getConverters()) { - stream = cverter.processStream(stream, this.taskState); + + // Use the recordStreamProcessor list if it is configured. This list can contain both all RecordStreamProcessor types + if (!this.recordStreamProcessors.isEmpty()) { + for (RecordStreamProcessor streamProcessor : this.recordStreamProcessors) { + stream = streamProcessor.processStream(stream, this.taskState); } } else { - stream = this.converter.processStream(stream, this.taskState); + if (this.converter instanceof MultiConverter) { + // if multiconverter, unpack it + for (Converter cverter : ((MultiConverter) this.converter).getConverters()) { + stream = cverter.processStream(stream, this.taskState); + } + } else { + stream = this.converter.processStream(stream, this.taskState); + } } stream = this.rowChecker.processStream(stream, this.taskState); @@ -124,7 +135,7 @@ public class StreamModelTaskRunner { if (isForkAsync) { forkedStream = forkedStream.mapStream(f -> f.observeOn(Schedulers.from(this.taskExecutor.getForkExecutor()), false, bufferSize)); } - Fork fork = new Fork(this.taskContext, forkedStream.getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode); + Fork fork = new Fork(this.taskContext, forkedStream.getGlobalMetadata().getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode); fork.consumeRecordStream(forkedStream); this.forks.put(Optional.of(fork), Optional.of(Futures.immediateFuture(null))); this.task.configureStreamingFork(fork, this.watermarkingStrategy); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 69c9a1c..65cf611 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -29,12 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.BooleanUtils; -import org.apache.gobblin.converter.DataConversionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -43,14 +44,13 @@ import com.google.common.io.Closer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import lombok.NoArgsConstructor; - import org.apache.gobblin.Constructs; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.Converter; +import org.apache.gobblin.converter.DataConversionException; import org.apache.gobblin.fork.CopyHelper; import org.apache.gobblin.fork.CopyNotSupportedException; import org.apache.gobblin.fork.Copyable; @@ -64,6 +64,7 @@ import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.publisher.SingleTaskDataPublisher; import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults; import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker; +import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.runtime.fork.AsynchronousFork; import org.apache.gobblin.runtime.fork.Fork; import org.apache.gobblin.runtime.fork.SynchronousFork; @@ -71,18 +72,13 @@ import org.apache.gobblin.runtime.task.TaskIFace; import org.apache.gobblin.runtime.util.TaskMetrics; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.extractor.JobCommitPolicy; -import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.source.extractor.StreamingExtractor; import org.apache.gobblin.state.ConstructState; +import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.AcknowledgableWatermark; -import org.apache.gobblin.writer.DataWriter; -import org.apache.gobblin.writer.FineGrainedWatermarkTracker; -import org.apache.gobblin.writer.MultiWriterWatermarkManager; -import org.apache.gobblin.writer.TrackerBasedWatermarkManager; -import org.apache.gobblin.writer.WatermarkAwareWriter; -import org.apache.gobblin.writer.WatermarkManager; -import org.apache.gobblin.writer.WatermarkStorage; +import org.apache.gobblin.writer.*; + +import lombok.NoArgsConstructor; /** @@ -139,6 +135,7 @@ public class Task implements TaskIFace { private final Optional<WatermarkManager> watermarkManager; private final Optional<FineGrainedWatermarkTracker> watermarkTracker; private final Optional<WatermarkStorage> watermarkStorage; + private final List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors; private final Closer closer; @@ -173,7 +170,32 @@ public class Task implements TaskIFace { this.extractor = closer.register(new InstrumentedExtractorDecorator<>(this.taskState, this.taskContext.getExtractor())); - this.converter = closer.register(new MultiConverter(this.taskContext.getConverters())); + this.recordStreamProcessors = this.taskContext.getRecordStreamProcessors(); + + // add record stream processors to closer if they are closeable + for (RecordStreamProcessor r: recordStreamProcessors) { + if (r instanceof Closeable) { + this.closer.register((Closeable)r); + } + } + + List<Converter<?,?,?,?>> converters = this.taskContext.getConverters(); + + this.converter = closer.register(new MultiConverter(converters)); + + // can't have both record stream processors and converter lists configured + try { + Preconditions.checkState(this.recordStreamProcessors.isEmpty() || converters.isEmpty(), + "Converters cannot be specified when RecordStreamProcessors are specified"); + } catch (IllegalStateException e) { + try { + closer.close(); + } catch (Throwable t) { + LOG.error("Failed to close all open resources", t); + } + throw new TaskInstantiationException("Converters cannot be specified when RecordStreamProcessors are specified"); + } + try { this.rowChecker = closer.register(this.taskContext.getRowLevelPolicyChecker()); } catch (Exception e) { @@ -314,7 +336,7 @@ public class Task implements TaskIFace { runSynchronousModel(); } else { new StreamModelTaskRunner(this, this.taskState, this.closer, this.taskContext, this.extractor, - this.converter, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested, + this.converter, this.recordStreamProcessors, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested, this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks, this.watermarkingStrategy).run(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java index 590cdf9..7b71eba 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java @@ -41,6 +41,7 @@ import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilderFactory import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults; import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker; import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckerBuilderFactory; +import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.runtime.util.TaskMetrics; import org.apache.gobblin.source.Source; import org.apache.gobblin.source.extractor.Extractor; @@ -232,6 +233,62 @@ public class TaskContext { } /** + * Get the list of pre-fork {@link RecordStreamProcessor}s. + * + * @return list (possibly empty) of {@link RecordStreamProcessor}s + */ + public List<RecordStreamProcessor<?, ?, ?, ?>> getRecordStreamProcessors() { + return getRecordStreamProcessors(-1, this.taskState); + } + + /** + * Get the list of post-fork {@link RecordStreamProcessor}s for a given branch. + * + * @param index branch index + * @param forkTaskState a {@link TaskState} instance specific to the fork identified by the branch index + * @return list (possibly empty) of {@link RecordStreamProcessor}s + */ + @SuppressWarnings("unchecked") + public List<RecordStreamProcessor<?, ?, ?, ?>> getRecordStreamProcessors(int index, TaskState forkTaskState) { + String streamProcessorClassKey = + ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.RECORD_STREAM_PROCESSOR_CLASSES_KEY, index); + + if (!this.taskState.contains(streamProcessorClassKey)) { + return Collections.emptyList(); + } + + if (index >= 0) { + forkTaskState.setProp(ConfigurationKeys.FORK_BRANCH_ID_KEY, index); + } + + List<RecordStreamProcessor<?, ?, ?, ?>> streamProcessors = Lists.newArrayList(); + for (String streamProcessorClass : Splitter.on(",").omitEmptyStrings().trimResults() + .split(this.taskState.getProp(streamProcessorClassKey))) { + try { + RecordStreamProcessor<?, ?, ?, ?> streamProcessor = + RecordStreamProcessor.class.cast(Class.forName(streamProcessorClass).newInstance()); + + if (streamProcessor instanceof Converter) { + InstrumentedConverterDecorator instrumentedConverter = + new InstrumentedConverterDecorator<>((Converter)streamProcessor); + instrumentedConverter.init(forkTaskState); + streamProcessors.add(instrumentedConverter); + } else { + streamProcessors.add(streamProcessor); + } + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException(cnfe); + } catch (InstantiationException ie) { + throw new RuntimeException(ie); + } catch (IllegalAccessException iae) { + throw new RuntimeException(iae); + } + } + + return streamProcessors; + } + + /** * Get the {@link ForkOperator} to be applied to converted input schema and data record. * * @return {@link ForkOperator} to be used or <code>null</code> if none is specified http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java index c2f0cdb..d00938e 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java @@ -19,6 +19,7 @@ package org.apache.gobblin.runtime; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -38,27 +39,33 @@ import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.Converter; import org.apache.gobblin.converter.DataConversionException; import org.apache.gobblin.converter.SchemaConversionException; +import org.apache.gobblin.converter.SingleRecordIterable; import org.apache.gobblin.fork.IdentityForkOperator; +import org.apache.gobblin.metadata.GlobalMetadata; import org.apache.gobblin.publisher.TaskPublisher; import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker; import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults; import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker; import org.apache.gobblin.records.ControlMessageHandler; +import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.stream.ControlMessage; +import org.apache.gobblin.stream.ControlMessageInjector; import org.apache.gobblin.stream.FlushControlMessage; +import org.apache.gobblin.stream.MetadataUpdateControlMessage; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.stream.StreamEntity; import org.apache.gobblin.writer.DataWriter; import org.apache.gobblin.writer.DataWriterBuilder; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.*; import io.reactivex.Flowable; import lombok.AllArgsConstructor; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; /** @@ -109,6 +116,97 @@ public class TestRecordStream { Assert.assertEquals(writer.messages, Lists.newArrayList("flush called", "flush called")); } + /** + * Test of metadata update control messages that signal the converters to change schemas + * @throws Exception + */ + @Test + public void testMetadataUpdateControlMessages() throws Exception { + + MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()), new RecordEnvelope<>("b"), + new MetadataUpdateControlMessage(GlobalMetadata.<String>builder().schema("Schema2").build())}); + SchemaAppendConverter converter = new SchemaAppendConverter(); + MyDataWriter writer = new MyDataWriter(); + + Task task = setupTask(extractor, writer, converter); + + task.run(); + task.commit(); + Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + Assert.assertEquals(converter.records, Lists.newArrayList("a:schema", "b:Schema1")); + Assert.assertEquals(converter.messages, + Lists.newArrayList(new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build()))); + + Assert.assertEquals(writer.records, Lists.newArrayList("a:schema", "b:Schema1")); + Assert.assertEquals(writer.messages, Lists.newArrayList(new MetadataUpdateControlMessage<>( + GlobalMetadata.<String>builder().schema("Schema1").build()), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build()))); + } + + /** + * Test with the converter configured in the list of {@link RecordStreamProcessor}s. + * @throws Exception + */ + @Test + public void testMetadataUpdateWithStreamProcessors() throws Exception { + + MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()), new RecordEnvelope<>("b"), + new MetadataUpdateControlMessage(GlobalMetadata.<String>builder().schema("Schema2").build())}); + SchemaAppendConverter converter = new SchemaAppendConverter(); + MyDataWriter writer = new MyDataWriter(); + + Task task = setupTask(extractor, writer, Collections.EMPTY_LIST, Lists.newArrayList(converter)); + + task.run(); + task.commit(); + Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + Assert.assertEquals(converter.records, Lists.newArrayList("a:schema", "b:Schema1")); + Assert.assertEquals(converter.messages, + Lists.newArrayList(new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build()))); + + Assert.assertEquals(writer.records, Lists.newArrayList("a:schema", "b:Schema1")); + Assert.assertEquals(writer.messages, Lists.newArrayList(new MetadataUpdateControlMessage<>( + GlobalMetadata.<String>builder().schema("Schema1").build()), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build()))); + } + + /** + * Test the injection of {@link ControlMessage}s + * @throws Exception + */ + @Test + public void testInjectedControlMessages() throws Exception { + + MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("schema:a"), + new RecordEnvelope<>("schema:b"), new RecordEnvelope<>("schema1:c"), new RecordEnvelope<>("schema2:d")}); + SchemaChangeDetectionInjector injector = new SchemaChangeDetectionInjector(); + SchemaAppendConverter converter = new SchemaAppendConverter(); + MyDataWriter writer = new MyDataWriter(); + + Task task = setupTask(extractor, writer, Collections.EMPTY_LIST, + Lists.newArrayList(injector, converter)); + + task.run(); + task.commit(); + Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + Assert.assertEquals(converter.records, Lists.newArrayList("a:schema", "b:schema", "c:schema1", "d:schema2")); + Assert.assertEquals(converter.messages, + Lists.newArrayList(new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("schema1").build()), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("schema2").build()))); + + Assert.assertEquals(writer.records, Lists.newArrayList("a:schema", "b:schema", "c:schema1", "d:schema2")); + Assert.assertEquals(writer.messages, Lists.newArrayList(new MetadataUpdateControlMessage<>( + GlobalMetadata.<String>builder().schema("schema1").build()), + new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("schema2").build()))); + } + @Test public void testAcks() throws Exception { @@ -178,6 +276,11 @@ public class TestRecordStream { } private Task setupTask(Extractor extractor, DataWriterBuilder writer, Converter converter) throws Exception { + return setupTask(extractor, writer, Lists.newArrayList(converter), Collections.EMPTY_LIST); + } + + private Task setupTask(Extractor extractor, DataWriterBuilder writer, List<Converter<?,?,?,?>> converters, + List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors) throws Exception { // Create a TaskState TaskState taskState = getEmptyTestTaskState("testRetryTaskId"); taskState.setProp(ConfigurationKeys.TASK_SYNCHRONOUS_EXECUTION_MODEL_KEY, false); @@ -186,7 +289,8 @@ public class TestRecordStream { when(mockTaskContext.getExtractor()).thenReturn(extractor); when(mockTaskContext.getForkOperator()).thenReturn(new IdentityForkOperator()); when(mockTaskContext.getTaskState()).thenReturn(taskState); - when(mockTaskContext.getConverters()).thenReturn(Lists.newArrayList(converter)); + when(mockTaskContext.getConverters()).thenReturn(converters); + when(mockTaskContext.getRecordStreamProcessors()).thenReturn(recordStreamProcessors); when(mockTaskContext.getTaskLevelPolicyChecker(any(TaskState.class), anyInt())) .thenReturn(mock(TaskLevelPolicyChecker.class)); when(mockTaskContext.getRowLevelPolicyChecker()). @@ -241,7 +345,8 @@ public class TestRecordStream { @Override public RecordStreamWithMetadata<String, String> recordStream(AtomicBoolean shutdownRequest) throws IOException { - return new RecordStreamWithMetadata<>(Flowable.fromArray(this.stream), "schema"); + return new RecordStreamWithMetadata<>(Flowable.fromArray(this.stream), + GlobalMetadata.<String>builder().schema("schema").build()); } } @@ -306,6 +411,81 @@ public class TestRecordStream { } } + + /** + * Converter that appends the output schema string to the record string + */ + static class SchemaAppendConverter extends Converter<String, String, String, String> { + private List<String> records = new ArrayList<>(); + private List<ControlMessage<String>> messages = new ArrayList<>(); + + @Override + public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException { + return inputSchema; + } + + @Override + public Iterable<String> convertRecord(String outputSchema, String inputRecord, WorkUnitState workUnit) + throws DataConversionException { + String inputWithoutSchema = inputRecord.substring(inputRecord.indexOf(":") + 1); + String outputRecord = inputWithoutSchema + ":" + outputSchema; + records.add(outputRecord); + return Lists.newArrayList(outputRecord); + } + + @Override + public ControlMessageHandler getMessageHandler() { + return messages::add; + } + } + + /** + * Input to this {@link RecordStreamProcessor} is a string of the form "schema:value". + * It will inject a {@link MetadataUpdateControlMessage} when a schema change is detected. + */ + static class SchemaChangeDetectionInjector extends ControlMessageInjector<String, String> { + private List<String> records = new ArrayList<>(); + private List<ControlMessage<String>> messages = new ArrayList<>(); + private GlobalMetadata<String> globalMetadata; + + public Iterable<String> convertRecord(String outputSchema, String inputRecord, WorkUnitState workUnitState) + throws DataConversionException { + + String outputRecord = inputRecord.split(":")[1]; + records.add(outputRecord); + return Lists.newArrayList(outputRecord); + } + + @Override + protected void setInputGlobalMetadata(GlobalMetadata<String> inputGlobalMetadata, WorkUnitState workUnitState) { + this.globalMetadata = inputGlobalMetadata; + } + + @Override + public Iterable<ControlMessage<String>> injectControlMessagesBefore(RecordEnvelope<String> inputRecordEnvelope, + WorkUnitState workUnitState) { + String recordSchema = inputRecordEnvelope.getRecord().split(":")[0]; + + if (!recordSchema.equals(this.globalMetadata.getSchema())) { + return new SingleRecordIterable<>(new MetadataUpdateControlMessage<>( + GlobalMetadata.<String>builder().schema(recordSchema).build())); + } + + return null; + } + + @Override + public Iterable<ControlMessage<String>> injectControlMessagesAfter(RecordEnvelope<String> inputRecordEnvelope, + WorkUnitState workUnitState) { + return null; + } + + @Override + public ControlMessageHandler getMessageHandler() { + return messages::add; + } + } + static class MyFlushDataWriter extends DataWriterBuilder<String, String> implements DataWriter<String> { private List<String> records = new ArrayList<>(); private List<String> messages = new ArrayList<>();
