This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c7e4a6e  [GOBBLIN-859] let writer pass latest schema to WorkUnitState
c7e4a6e is described below

commit c7e4a6efbb08c433c2be69958eebae3b328e347a
Author: Zihan Li <[email protected]>
AuthorDate: Wed Aug 14 18:50:40 2019 -0700

    [GOBBLIN-859] let writer pass latest schema to WorkUnitState
    
    Closes #2714 from ZihanLi58/GOBBLIN-859
---
 .../apache/gobblin/writer/PartitionedDataWriter.java  | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3dad0ef..3181bd6 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -48,7 +48,6 @@ import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
 import 
org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
 import org.apache.gobblin.records.ControlMessageHandler;
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -67,6 +66,7 @@ import 
org.apache.gobblin.writer.partitioner.WriterPartitioner;
 @Slf4j
 public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements 
FinalState, SpeculativeAttemptAwareConstruct, WatermarkAwareWriter<D> {
 
+  public static final String WRITER_LATEST_SCHEMA = "writer.latest.schema";
   private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
       new 
GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
 
@@ -96,6 +96,9 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
     this.closer = Closer.create();
     this.writerBuilder = builder;
     this.controlMessageHandler = new PartitionDataWriterMessageHandler();
+    if(builder.schema != null) {
+      this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
+    }
     this.partitionWriters = CacheBuilder.newBuilder().build(new 
CacheLoader<GenericRecord, DataWriter<D>>() {
       @Override
       public DataWriter<D> load(final GenericRecord key)
@@ -118,19 +121,19 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
 
     if (state.contains(ConfigurationKeys.WRITER_PARTITIONER_CLASS)) {
       Preconditions.checkArgument(builder instanceof 
PartitionAwareDataWriterBuilder, String
-              .format("%s was specified but the writer %s does not support 
partitioning.",
-                  ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
builder.getClass().getCanonicalName()));
+          .format("%s was specified but the writer %s does not support 
partitioning.",
+              ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
builder.getClass().getCanonicalName()));
 
       try {
         this.shouldPartition = true;
         this.builder = 
Optional.of(PartitionAwareDataWriterBuilder.class.cast(builder));
         this.partitioner = 
Optional.of(WriterPartitioner.class.cast(ConstructorUtils
-                
.invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)),
 state,
-                    builder.getBranches(), builder.getBranch())));
+            
.invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)),
 state,
+                builder.getBranches(), builder.getBranch())));
         Preconditions
             
.checkArgument(this.builder.get().validatePartitionSchema(this.partitioner.get().partitionSchema()),
 String
-                    .format("Writer %s does not support schema from 
partitioner %s",
-                        builder.getClass().getCanonicalName(), 
this.partitioner.getClass().getCanonicalName()));
+                .format("Writer %s does not support schema from partitioner 
%s",
+                    builder.getClass().getCanonicalName(), 
this.partitioner.getClass().getCanonicalName()));
       } catch (ReflectiveOperationException roe) {
         throw new IOException(roe);
       }
@@ -321,6 +324,8 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
       if (message instanceof MetadataUpdateControlMessage) {
         
PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage)
 message)
             .getGlobalMetadata().getSchema());
+        state.setProp(WRITER_LATEST_SCHEMA, ((MetadataUpdateControlMessage) 
message)
+            .getGlobalMetadata().getSchema());
       }
 
       for (DataWriter writer : 
PartitionedDataWriter.this.partitionWriters.asMap().values()) {

Reply via email to