vinothchandar commented on a change in pull request #3075:
URL: https://github.com/apache/hudi/pull/3075#discussion_r660152303
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -111,6 +112,18 @@ public static HoodieRecordPayload createPayload(String
payloadClass, GenericReco
}
}
+ public static Map<String, String> getExtraMetadata(Map<String, String>
properties) {
+ Map<String, String> extraMetadataMap = new HashMap<>();
+ if
(properties.containsKey(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY()))
{
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if
(entry.getKey().contains(properties.get(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY())))
{
Review comment:
startsWith()?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -337,6 +337,11 @@ object HoodieSparkSqlWriter {
throw new HoodieException("Dropping duplicates with bulk_insert in row
writer path is not supported yet")
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA,
schema.toString)
+
+ val metaMap = parameters.filter(kv =>
+ kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+ val javaMap = new util.HashMap[String, String](mapAsJavaMap(metaMap))
Review comment:
rename: javaMetaMap?
##########
File path:
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -78,6 +85,7 @@ public void onDataWriterCommit(WriterCommitMessage message) {
@Override
public void commit(WriterCommitMessage[] messages) {
+
Review comment:
remove extra line?
##########
File path:
hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
##########
@@ -47,12 +54,18 @@
@Test
public void testDataSourceWriter() throws Exception {
+ testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP);
+ }
+
+ private void testDataSourceWriterInternal(Map<String, String> extraMetadata,
Map<String, String> expectedExtraMetadata) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
String instantTime = "001";
// init writer
+
Review comment:
please remove all extra lines?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -111,6 +112,18 @@ public static HoodieRecordPayload createPayload(String
payloadClass, GenericReco
}
}
+ public static Map<String, String> getExtraMetadata(Map<String, String>
properties) {
+ Map<String, String> extraMetadataMap = new HashMap<>();
+ if
(properties.containsKey(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY()))
{
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
Review comment:
can we write this as a lambda if possible to read more nicely?
##########
File path:
hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
##########
@@ -47,12 +54,18 @@
@Test
public void testDataSourceWriter() throws Exception {
+ testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP);
+ }
+
+ private void testDataSourceWriterInternal(Map<String, String> extraMetadata,
Map<String, String> expectedExtraMetadata) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
String instantTime = "001";
// init writer
+
HoodieDataSourceInternalWriter dataSourceInternalWriter =
- new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), hadoopConf);
+ new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), hadoopConf,
+ new DataSourceOptions(extraMetadata));
Review comment:
Not sure why we are making options using extraMetadata. feels bit
confusing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]