This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 405be173664 [MINOR] Making misc fixes to deltastreamer sources(S3 and 
GCS) (#10095)
405be173664 is described below

commit 405be173664b724ca941194136a5b5dcff4bb598
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Nov 22 21:00:33 2023 -0800

    [MINOR] Making misc fixes to deltastreamer sources(S3 and GCS) (#10095)
    
    * Making misc fixes to deltastreamer sources
    
    * Fixing test failures
    
    * adding inference to CloudSourceconfig... cloud.data.datafile.format
    
    * Fix the tests for s3 events source
    
    * Fix the tests for s3 events source
    
    ---------
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../main/java/org/apache/hudi/common/util/StringUtils.java    | 10 ++++++++++
 .../java/org/apache/hudi/common/util/TestStringUtils.java     |  7 +++++++
 .../org/apache/hudi/utilities/config/CloudSourceConfig.java   |  2 +-
 .../apache/hudi/utilities/schema/SchemaRegistryProvider.java  | 11 +++++++++--
 .../hudi/utilities/sources/S3EventsHoodieIncrSource.java      | 11 ++++++++++-
 5 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
index d7d79796aec..5b95bc60312 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -173,4 +173,14 @@ public class StringUtils {
     }
     return input.substring(0, i);
   }
+
+  public static String truncate(String str, int headLength, int tailLength) {
+    if (isNullOrEmpty(str) || str.length() <= headLength + tailLength) {
+      return str;
+    }
+    String head = str.substring(0, headLength);
+    String tail = str.substring(str.length() - tailLength);
+
+    return head + "..." + tail;
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 3bdf6d48b39..54985056bf0 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -114,4 +114,11 @@ public class TestStringUtils {
     }
     return sb.toString();
   }
+
+  @Test
+  public void testTruncate() {
+    assertNull(StringUtils.truncate(null, 10, 10));
+    assertEquals("http://use...ons/latest";, 
StringUtils.truncate("http://username:[email protected]:5000/versions/latest";,
 10, 10));
+    assertEquals("http://abc.com";, StringUtils.truncate("http://abc.com";, 10, 
10));
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index e7b44cf9121..007d36fc704 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -108,7 +108,7 @@ public class CloudSourceConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> DATAFILE_FORMAT = ConfigProperty
       .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.datafile.format")
-      .defaultValue("parquet")
+      .defaultValue(HoodieIncrSourceConfig.SOURCE_FILE_FORMAT.defaultValue())
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.cloud.data.datafile.format")
       .markAdvanced()
       .withDocumentation("Format of the data file. By default, this will be 
the same as hoodie.streamer.source.hoodieincr.file.format");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 780fbb9dc0a..110c8cc2fb1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -195,7 +195,10 @@ public class SchemaRegistryProvider extends SchemaProvider 
{
     try {
       return parseSchemaFromRegistry(registryUrl);
     } catch (Exception e) {
-      throw new HoodieSchemaFetchException("Error reading source schema from 
registry :" + registryUrl, e);
+      throw new HoodieSchemaFetchException(String.format(
+          "Error reading source schema from registry. Please check %s is 
configured correctly. Truncated URL: %s",
+          Config.SRC_SCHEMA_REGISTRY_URL_PROP,
+          StringUtils.truncate(registryUrl, 10, 10)), e);
     }
   }
 
@@ -207,7 +210,11 @@ public class SchemaRegistryProvider extends SchemaProvider 
{
     try {
       return parseSchemaFromRegistry(targetRegistryUrl);
     } catch (Exception e) {
-      throw new HoodieSchemaFetchException("Error reading target schema from 
registry :" + targetRegistryUrl, e);
+      throw new HoodieSchemaFetchException(String.format(
+          "Error reading target schema from registry. Please check %s is 
configured correctly. If that is not configured then check %s. Truncated URL: 
%s",
+          Config.SRC_SCHEMA_REGISTRY_URL_PROP,
+          Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
+          StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
     }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 325e494e0ab..61ed02da106 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -35,6 +35,7 @@ import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 
+import org.apache.parquet.Strings;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -51,6 +52,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
@@ -70,6 +72,7 @@ import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMiss
 public class S3EventsHoodieIncrSource extends HoodieIncrSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
+  private static final String EMPTY_STRING = "";
   private final String srcPath;
   private final int numInstantsPerFetch;
   private final boolean checkIfFileExists;
@@ -135,7 +138,13 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
     this.numInstantsPerFetch = getIntWithAltKeys(props, 
NUM_INSTANTS_PER_FETCH);
     this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
-    this.fileFormat = getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true);
+
+    // This is to ensure backward compatibility where we were using the
+    // config SOURCE_FILE_FORMAT for file format in previous versions.
+    this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, 
DATAFILE_FORMAT, EMPTY_STRING))
+        ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
+        : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
+
     this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
     this.queryRunner = queryRunner;
     this.cloudDataFetcher = cloudDataFetcher;

Reply via email to