This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 405be173664 [MINOR] Making misc fixes to deltastreamer sources(S3 and
GCS) (#10095)
405be173664 is described below
commit 405be173664b724ca941194136a5b5dcff4bb598
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Nov 22 21:00:33 2023 -0800
[MINOR] Making misc fixes to deltastreamer sources(S3 and GCS) (#10095)
* Making misc fixes to deltastreamer sources
* Fixing test failures
* adding inference to CloudSourceconfig... cloud.data.datafile.format
* Fix the tests for s3 events source
* Fix the tests for s3 events source
---------
Co-authored-by: rmahindra123 <[email protected]>
---
.../main/java/org/apache/hudi/common/util/StringUtils.java | 10 ++++++++++
.../java/org/apache/hudi/common/util/TestStringUtils.java | 7 +++++++
.../org/apache/hudi/utilities/config/CloudSourceConfig.java | 2 +-
.../apache/hudi/utilities/schema/SchemaRegistryProvider.java | 11 +++++++++--
.../hudi/utilities/sources/S3EventsHoodieIncrSource.java | 11 ++++++++++-
5 files changed, 37 insertions(+), 4 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
index d7d79796aec..5b95bc60312 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -173,4 +173,14 @@ public class StringUtils {
}
return input.substring(0, i);
}
+
+ public static String truncate(String str, int headLength, int tailLength) {
+ if (isNullOrEmpty(str) || str.length() <= headLength + tailLength) {
+ return str;
+ }
+ String head = str.substring(0, headLength);
+ String tail = str.substring(str.length() - tailLength);
+
+ return head + "..." + tail;
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 3bdf6d48b39..54985056bf0 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -114,4 +114,11 @@ public class TestStringUtils {
}
return sb.toString();
}
+
+ @Test
+ public void testTruncate() {
+ assertNull(StringUtils.truncate(null, 10, 10));
+ assertEquals("http://use...ons/latest",
StringUtils.truncate("http://username:[email protected]:5000/versions/latest",
10, 10));
+ assertEquals("http://abc.com", StringUtils.truncate("http://abc.com", 10,
10));
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index e7b44cf9121..007d36fc704 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -108,7 +108,7 @@ public class CloudSourceConfig extends HoodieConfig {
public static final ConfigProperty<String> DATAFILE_FORMAT = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.datafile.format")
- .defaultValue("parquet")
+ .defaultValue(HoodieIncrSourceConfig.SOURCE_FILE_FORMAT.defaultValue())
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.cloud.data.datafile.format")
.markAdvanced()
.withDocumentation("Format of the data file. By default, this will be
the same as hoodie.streamer.source.hoodieincr.file.format");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 780fbb9dc0a..110c8cc2fb1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -195,7 +195,10 @@ public class SchemaRegistryProvider extends SchemaProvider
{
try {
return parseSchemaFromRegistry(registryUrl);
} catch (Exception e) {
- throw new HoodieSchemaFetchException("Error reading source schema from
registry :" + registryUrl, e);
+ throw new HoodieSchemaFetchException(String.format(
+ "Error reading source schema from registry. Please check %s is
configured correctly. Truncated URL: %s",
+ Config.SRC_SCHEMA_REGISTRY_URL_PROP,
+ StringUtils.truncate(registryUrl, 10, 10)), e);
}
}
@@ -207,7 +210,11 @@ public class SchemaRegistryProvider extends SchemaProvider
{
try {
return parseSchemaFromRegistry(targetRegistryUrl);
} catch (Exception e) {
- throw new HoodieSchemaFetchException("Error reading target schema from
registry :" + targetRegistryUrl, e);
+ throw new HoodieSchemaFetchException(String.format(
+ "Error reading target schema from registry. Please check %s is
configured correctly. If that is not configured then check %s. Truncated URL:
%s",
+ Config.SRC_SCHEMA_REGISTRY_URL_PROP,
+ Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
+ StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 325e494e0ab..61ed02da106 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -35,6 +35,7 @@ import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.parquet.Strings;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -51,6 +52,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
@@ -70,6 +72,7 @@ import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMiss
public class S3EventsHoodieIncrSource extends HoodieIncrSource {
private static final Logger LOG =
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
+ private static final String EMPTY_STRING = "";
private final String srcPath;
private final int numInstantsPerFetch;
private final boolean checkIfFileExists;
@@ -135,7 +138,13 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
this.numInstantsPerFetch = getIntWithAltKeys(props,
NUM_INSTANTS_PER_FETCH);
this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
- this.fileFormat = getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true);
+
+ // This is to ensure backward compatibility where we were using the
+ // config SOURCE_FILE_FORMAT for file format in previous versions.
+ this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props,
DATAFILE_FORMAT, EMPTY_STRING))
+ ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
+ : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
+
this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;