afilipchik commented on a change in pull request #1566:
URL: https://github.com/apache/incubator-hudi/pull/1566#discussion_r426074416



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+
+import java.util.Set;
+
+/**
+ * Tracks already processed schemas.
+ */
+public class SchemaSet implements Serializable {
+

Review comment:
       should we add serialVersionUID? If it is not specified and anything in 
the class imports is shaded -> it will affect autogenerated one. Which causes 
issues if there is more than 1 version of the class in the classpath which was 
shaded differently. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -81,11 +66,22 @@ private static Schema getSchema(String registryUrl) throws 
IOException {
 
   @Override
   public Schema getSourceSchema() {
-    return schema;
+    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    try {
+      return getSchema(registryUrl);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error reading source schema from registry 
:" + registryUrl, ioe);
+    }
   }
 
   @Override
   public Schema getTargetSchema() {
-    return targetSchema;
+    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    String targetRegistryUrl = 
config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    try {
+      return getSchema(targetRegistryUrl);

Review comment:
       it might result in target schema != source schema when targetRegistryUrl 
is not specified as schema might change between getSourceSchema, 
getTargetSchema calls. Is it a problem? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
##########
@@ -59,6 +60,10 @@ public void compact(HoodieInstant instant) throws 
IOException {
           "Compaction for instant (" + instant + ") failed with write errors. 
Errors :" + numWriteErrors);
     }
     // Commit compaction
-    compactionClient.commitCompaction(instant.getTimestamp(), res, 
Option.empty());
+    writeClient.commitCompaction(instant.getTimestamp(), res, Option.empty());
+  }
+
+  public void updateWriteClient(HoodieWriteClient writeClient) {

Review comment:
       is it used anywhere? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+
+import java.util.Set;
+
+/**
+ * Tracks already processed schemas.
+ */
+public class SchemaSet implements Serializable {
+
+  private final Set<Long> processedSchema = new HashSet<>();

Review comment:
       will this grow indefinitely? How would we remove old schema? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -81,11 +66,22 @@ private static Schema getSchema(String registryUrl) throws 
IOException {
 
   @Override
   public Schema getSourceSchema() {
-    return schema;
+    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);

Review comment:
       is it called only once during a run? Will it be an issue if it is called 
more than once and slightly different schema is returned? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -464,21 +464,25 @@ private void shutdownCompactor(boolean error) {
      */
     protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) 
{
       if (cfg.isAsyncCompactionEnabled()) {
-        asyncCompactService = new AsyncCompactService(jssc, writeClient);
-        // Enqueue existing pending compactions first
-        HoodieTableMetaClient meta =
-            new HoodieTableMetaClient(new 
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
-        List<HoodieInstant> pending = 
CompactionUtils.getPendingCompactionInstantTimes(meta);
-        pending.forEach(hoodieInstant -> 
asyncCompactService.enqueuePendingCompaction(hoodieInstant));
-        asyncCompactService.start((error) -> {
-          // Shutdown DeltaSync
-          shutdown(false);
-          return true;
-        });
-        try {
-          
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
-        } catch (InterruptedException ie) {
-          throw new HoodieException(ie);
+        if (null != asyncCompactService) {

Review comment:
       would be great to have some documentation on why it is done this way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to