This is an automated email from the ASF dual-hosted git repository.

ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6df5743  [GOBBLIN-772][GOBBLIN-724] Implement Schema Comparison 
Strategy during Disctp
6df5743 is described below

commit 6df5743b90741d326a0748df76f87b409a78fddc
Author: Zihan Li <[email protected]>
AuthorDate: Tue May 28 13:37:05 2019 -0700

    [GOBBLIN-772][GOBBLIN-724] Implement Schema Comparison Strategy during 
Disctp
    
    Closes #2637 from ZihanLi58/schemaCheck
---
 .../gobblin/configuration/ConfigurationKeys.java   |   6 ++
 ...leAwareInputStreamExtractorWithCheckSchema.java |  38 ++++---
 .../AvroSchemaCheckDefaultStrategy.java            | 120 +++++++++++++++++++++
 .../util/schema_check/AvroSchemaCheckStrategy.java |  57 ++++++++++
 .../embedded/EmbeddedGobblinDistcpTest.java        |  11 +-
 ...gobblin.restli.throttling.permits.snapshot.json |  19 ++++
 6 files changed, 235 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b628138..09c4c0e 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -936,4 +936,10 @@ public class ConfigurationKeys {
   public static final String GIT_MONITOR_SSH_KNOWN_HOSTS = "knownHosts";
   public static final String GIT_MONITOR_SSH_KNOWN_HOSTS_FILE = 
"knownHostsFile";
   public static final String GIT_MONITOR_JSCH_LOGGER_ENABLED = 
"isJschLoggerEnabled";
+
+  /**
+   * Configuration related to avro schema check strategy
+   */
+  public static final String AVRO_SCHEMA_CHECK_STRATEGY = 
"avro.schema.check.strategy";
+  public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT = 
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
index 34c4ab2..eb4f627 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
@@ -26,6 +26,8 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.util.schema_check.AvroSchemaCheckStrategy;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -39,32 +41,42 @@ import 
org.apache.gobblin.source.extractor.DataRecordException;
  * check if the schema matches the expected schema. If not it will abort the 
job.
  */
 
-public class FileAwareInputStreamExtractorWithCheckSchema extends 
FileAwareInputStreamExtractor{
+public class FileAwareInputStreamExtractorWithCheckSchema extends 
FileAwareInputStreamExtractor {
 
-  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file, WorkUnitState state)
-  {
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file, WorkUnitState state) {
     super(fs, file, state);
   }
-  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file)
-  {
+
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file) {
     this(fs, file, null);
   }
 
   @Override
-  protected FileAwareInputStream buildStream(FileSystem fsFromFile)
-      throws DataRecordException, IOException{
-    if(!schemaChecking(fsFromFile))
-    {
+  protected FileAwareInputStream buildStream(FileSystem fsFromFile) throws 
DataRecordException, IOException {
+    if (!schemaChecking(fsFromFile)) {
       throw new DataRecordException("Schema does not match the expected 
schema");
     }
     return super.buildStream(fsFromFile);
   }
 
-  protected boolean schemaChecking(FileSystem fsFromFile)
-      throws IOException {
+  /**
+   * Use {@link AvroSchemaCheckStrategy} to make sure the real schema and the 
expected schema have matching field names and types
+   * @param fsFromFile
+   * @return
+   * @throws IOException
+   */
+  protected boolean schemaChecking(FileSystem fsFromFile) throws IOException {
     DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-    DataFileReader<GenericRecord> dataFileReader = new DataFileReader(new 
FsInput(this.file.getFileStatus().getPath(),fsFromFile), datumReader);
+    DataFileReader<GenericRecord> dataFileReader =
+        new DataFileReader(new FsInput(this.file.getFileStatus().getPath(), 
new Configuration()), datumReader);
     Schema schema = dataFileReader.getSchema();
-    return 
schema.toString().equals(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+    Schema expectedSchema = new 
Schema.Parser().parse(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+    AvroSchemaCheckStrategy strategy = 
AvroSchemaCheckStrategy.AvroSchemaCheckStrategyFactory.create(this.state);
+    if(strategy == null)
+    {
+      throw new IOException("schema check strategy cannot be initialized");
+    }
+    return strategy.compare(expectedSchema,schema);
   }
+
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/schema_check/AvroSchemaCheckDefaultStrategy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/schema_check/AvroSchemaCheckDefaultStrategy.java
new file mode 100644
index 0000000..1c7696d
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/schema_check/AvroSchemaCheckDefaultStrategy.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.schema_check;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+
+
+/**
+ * default strategy to check the compatibility of avro schema
+ */
+public class AvroSchemaCheckDefaultStrategy implements AvroSchemaCheckStrategy 
{
+  /**
+   * This method will compare the name and types of the two schema
+   * @param expected The expected schema
+   * @param toValidate The real schema
+   * @return true when expected schema and toValidate schema have matching 
field names and types
+   */
+  public boolean compare(Schema expected, Schema toValidate)
+  {
+    if (toValidate.getType() != expected.getType() || 
!toValidate.getName().equals(expected.getName())) {return false;}
+    else {
+      switch (toValidate.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING: {
+          return true;
+        }
+        case ARRAY: {
+          return compare(toValidate.getElementType(), 
expected.getElementType());
+        }
+        case MAP: {
+          return compare(toValidate.getValueType(), expected.getValueType());
+        }
+        case FIXED: {
+          // fixed size and name must match:
+          if (toValidate.getFixedSize() != expected.getFixedSize()) {
+            return false;
+          }
+        }
+        case ENUM: {
+          // expected symbols must contain all toValidate symbols:
+          final Set<String> expectedSymbols = new 
HashSet<>(expected.getEnumSymbols());
+          final Set<String> toValidateSymbols = new 
HashSet<String>(toValidate.getEnumSymbols());
+          if (expectedSymbols.size() != toValidateSymbols.size()) {
+            return false;
+          }
+          if (!expectedSymbols.containsAll(toValidateSymbols)) {
+            return false;
+          }
+        }
+
+        case RECORD: {
+          // Check that each field of toValidate schema is in expected schema
+          if (toValidate.getFields().size() != expected.getFields().size()) {
+            return false;
+          }
+          for (final Schema.Field expectedFiled : expected.getFields()) {
+            final Schema.Field toValidateField = 
toValidate.getField(expectedFiled.name());
+            if (toValidateField == null) {
+              // expected field does not correspond to any field in the 
toValidate record schema
+              return false;
+            } else {
+              if (!compare(toValidateField.schema(), expectedFiled.schema())) {
+                return false;
+              }
+            }
+          }
+          return true;
+        }
+        case UNION: {
+          // Check existing schema contains all the type in toValidate schema
+          if (toValidate.getTypes().size() != expected.getTypes().size()) {
+            return false;
+          }
+          HashSet<Schema> types = new HashSet<Schema>(expected.getTypes());
+          for (Schema toValidateType : toValidate.getTypes()) {
+            Schema equalSchema = null;
+            for (Schema type : types) {
+              if (compare(type, toValidateType)) {
+                equalSchema = type;
+                break;
+              }
+            }
+            if (equalSchema == null) {
+              return false;
+            }
+            types.remove(equalSchema);
+          }
+          return true;
+        }
+        default: {
+          throw new AvroRuntimeException("Unknown schema type: " + 
toValidate.getType());
+        }
+      }
+    }
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/schema_check/AvroSchemaCheckStrategy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/schema_check/AvroSchemaCheckStrategy.java
new file mode 100644
index 0000000..a8e2045
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/schema_check/AvroSchemaCheckStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.schema_check;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * The strategy to compare Avro schema.
+ */
+public interface AvroSchemaCheckStrategy {
+  /**
+   * A factory to initiate the Strategy
+   */
+  @Slf4j
+  class AvroSchemaCheckStrategyFactory {
+    /**
+     * Use the configuration to create a schema check strategy. If it's not 
found, return null.
+     * @param state
+     * @return
+     */
+    public static AvroSchemaCheckStrategy create(WorkUnitState state)
+    {
+      try {
+        return (AvroSchemaCheckStrategy) 
Class.forName(state.getProp(ConfigurationKeys.AVRO_SCHEMA_CHECK_STRATEGY, 
ConfigurationKeys.AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT)).newInstance();
+      } catch (Exception e) {
+        log.error(e.getMessage());
+        return null;
+      }
+    }
+  }
+  /**
+   * Make sure schema toValidate and expected have matching names and types.
+   * @param toValidate The real schema
+   * @param expected The expected schema
+   * @return
+   */
+  boolean compare(Schema expected, Schema toValidate);
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 96d917b..60d93a5 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -30,7 +30,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
-import 
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractorWithCheckSchema;
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
@@ -118,15 +117,21 @@ public class EmbeddedGobblinDistcpTest {
         new Path(tmpTarget.getAbsolutePath()));
     embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
     embedded.setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, 
SchemaCheckedCopySource.class.getName());
+    embedded.setConfiguration(ConfigurationKeys.AVRO_SCHEMA_CHECK_STRATEGY, 
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy");
     //test when schema is not the expected one, the job will be aborted.
-    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
"{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo1\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
+    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
"{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo1\",\"type\":[\"null\",\"int\"],\"doc\":\"this
 is for test\",\"default\":null}]}");
     JobExecutionResult result = embedded.run();
     Assert.assertTrue(new File(tmpSource, fileName).exists());
     Assert.assertFalse(result.isSuccessful());
     Assert.assertFalse(new File(tmpTarget, fileName).exists());
+    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
"{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo\",\"type\":[\"string\",\"int\"],\"doc\":\"this
 is for test\",\"default\":null}]}");
+    result = embedded.run();
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertFalse(result.isSuccessful());
+    Assert.assertFalse(new File(tmpTarget, fileName).exists());
 
     //test when schema is the expected one, the job will succeed.
-    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
"{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
+    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
"{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo\",\"type\":[\"null\",\"int\"],\"doc\":\"this
 is for test\",\"default\":null}]}");
     result = embedded.run();
     Assert.assertTrue(result.isSuccessful());
     Assert.assertTrue(new File(tmpSource, fileName).exists());
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/snapshot/org.apache.gobblin.restli.throttling.permits.snapshot.json
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/snapshot/org.apache.gobblin.restli.throttling.permits.snapshot.json
index 90fdd8d..1ea00bc 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/snapshot/org.apache.gobblin.restli.throttling.permits.snapshot.json
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/snapshot/org.apache.gobblin.restli.throttling.permits.snapshot.json
@@ -17,6 +17,18 @@
       "type" : "long",
       "doc" : "Client should not try to acquire permits before this delay has 
passed.",
       "optional" : true
+    }, {
+      "name" : "waitForPermitUseMillis",
+      "type" : "long",
+      "doc" : "Client must wait this many millis before allocating provided 
permits.",
+      "default" : 0,
+      "optional" : true
+    }, {
+      "name" : "unsatisfiablePermits",
+      "type" : "long",
+      "doc" : "If larger than 0, specifies request larger than this number are 
impossible to satisfy by the policy.",
+      "default" : 0,
+      "optional" : true
     } ]
   }, {
     "type" : "record",
@@ -35,11 +47,18 @@
       "name" : "minPermits",
       "type" : "long",
       "doc" : "Minimum number of useful permits.",
+      "default" : 0,
       "optional" : true
     }, {
       "name" : "requestorIdentifier",
       "type" : "string",
       "doc" : "Identifier of the service requesting the permits."
+    }, {
+      "name" : "version",
+      "type" : "int",
+      "doc" : "Protocol version, see ThrottlingProtocolVersion.java. Allows 
the server to avoid asking the client for unsupported operations.",
+      "default" : 0,
+      "optional" : true
     } ]
   }, {
     "type" : "record",

Reply via email to