[
https://issues.apache.org/jira/browse/HUDI-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jihwan Lee updated HUDI-7711:
-----------------------------
Description:
HudiMultiTableStreamer initializes common configs, then deepcopy related fields
into each streams.
Because _propsFilePath_ on each streamer is not handled, they always retrieve
path of test files as default value.
Also, if runs MultiTableStreamer with {_}--hoodie-conf{_}, each streamer should
be able to have these configs. (such like inheritance)
MultiTable configs (kafka-source.properties):
{code:java}
...
hoodie.streamer.ingestion.tablesToBeIngested=db.tbl1,db.tb2
hoodie.streamer.ingestion.db.tbl1.configFile=hdfs:///tmp/config_1.properties
hoodie.streamer.ingestion.db.tbl2.configFile=hdfs:///tmp/config_2.properties
... {code}
/tmp/config_1.properties:
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic1
... {code}
/tmp/config_2.properties:
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic2
... {code}
error log (workspace is replaced to \{RUNNING_PATH}) :
{code:java}
24/05/04 21:41:01 ERROR config.DFSPropertiesConfiguration: Error reading in
properties from dfs from file
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
24/05/04 21:41:01 INFO streamer.StreamSync: Shutting down embedded timeline
server
24/05/04 21:41:01 ERROR streamer.HoodieMultiTableStreamer: error while running
MultiTableDeltaStreamer for table: {TABLE}
org.apache.hudi.exception.HoodieIOException: Cannot read properties from dfs
from file
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
at
org.apache.hudi.common.config.DFSPropertiesConfiguration.addPropsFromFile(DFSPropertiesConfiguration.java:168)
at
org.apache.hudi.common.config.DFSPropertiesConfiguration.<init>(DFSPropertiesConfiguration.java:87)
at
org.apache.hudi.utilities.UtilHelpers.readConfig(UtilHelpers.java:258)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$Config.getProps(HoodieStreamer.java:453)
at
org.apache.hudi.utilities.streamer.StreamSync.getDeducedSchemaProvider(StreamSync.java:714)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:676)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:568)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:540)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:444)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:874)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:216)
at
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:457)
at
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:282)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: File
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
at
org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:97)
at org.apache.hudi.common.config.DFSPro
{code}
was:
HudiMultiTableStreamer initializes common configs, then deepcopy related fields
into each streams.
Because _propsFilePath_ on each streamer is not handled, they always retrieve
path of test files as default value.
Also, if runs MultiTableStreamer with {_}--hoodie-conf{_}, each streamer should
be able to have these configs. (such like inheritance)
MultiTable configs (kafka-source.properties):
{code:java}
...
hoodie.streamer.ingestion.tablesToBeIngested=db.tbl1,db.tb2
hoodie.streamer.ingestion.db.tbl1.configFile=hdfs:///tmp/config_1.properties
hoodie.streamer.ingestion.db.tbl2.configFile=hdfs:///tmp/config_2.properties
... {code}
/tmp/config_1.properties:
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic1
... {code}
/tmp/config_2.properties:
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic2
... {code}
error log (workspace is replaced to \{RUNNING_PATH}) :
{code:java}
24/05/04 21:41:01 ERROR config.DFSPropertiesConfiguration: Error reading in
properties from dfs from file
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
24/05/04 21:41:01 INFO streamer.StreamSync: Shutting down embedded timeline
server
24/05/04 21:41:01 ERROR streamer.HoodieMultiTableStreamer: error while running
MultiTableDeltaStreamer for table: review_processed_data
org.apache.hudi.exception.HoodieIOException: Cannot read properties from dfs
from file
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
at
org.apache.hudi.common.config.DFSPropertiesConfiguration.addPropsFromFile(DFSPropertiesConfiguration.java:168)
at
org.apache.hudi.common.config.DFSPropertiesConfiguration.<init>(DFSPropertiesConfiguration.java:87)
at
org.apache.hudi.utilities.UtilHelpers.readConfig(UtilHelpers.java:258)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$Config.getProps(HoodieStreamer.java:453)
at
org.apache.hudi.utilities.streamer.StreamSync.getDeducedSchemaProvider(StreamSync.java:714)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:676)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:568)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:540)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:444)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:874)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:216)
at
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:457)
at
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:282)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: File
file:/home1/irteam/user/jihwan/hudi-util/multi_review/src/test/resources/streamer-config/dfs-source.properties
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
at
org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:97)
at org.apache.hudi.common.config.DFSPro
{code}
> Fix MultiTableStreamer can deal with path of properties file for each streamer
> ------------------------------------------------------------------------------
>
> Key: HUDI-7711
> URL: https://issues.apache.org/jira/browse/HUDI-7711
> Project: Apache Hudi
> Issue Type: Bug
> Components: hudi-utilities
> Environment: hudi0.14.1, Spark3.2
> Reporter: Jihwan Lee
> Priority: Major
> Labels: pull-request-available
>
> HudiMultiTableStreamer initializes common configs, then deepcopy related
> fields into each streams.
> Because _propsFilePath_ on each streamer is not handled, they always retrieve
> path of test files as default value.
>
> Also, if runs MultiTableStreamer with {_}--hoodie-conf{_}, each streamer
> should be able to have these configs. (such like inheritance)
>
> MultiTable configs (kafka-source.properties):
>
> {code:java}
> ...
> hoodie.streamer.ingestion.tablesToBeIngested=db.tbl1,db.tb2
> hoodie.streamer.ingestion.db.tbl1.configFile=hdfs:///tmp/config_1.properties
> hoodie.streamer.ingestion.db.tbl2.configFile=hdfs:///tmp/config_2.properties
> ... {code}
>
>
> /tmp/config_1.properties:
>
> {code:java}
> ...
> hoodie.datasource.write.recordkey.field=id
> hoodie.streamer.source.kafka.topic=topic1
> ... {code}
>
>
> /tmp/config_2.properties:
> {code:java}
> ...
> hoodie.datasource.write.recordkey.field=id
> hoodie.streamer.source.kafka.topic=topic2
> ... {code}
>
> error log (workspace is replaced to \{RUNNING_PATH}) :
>
> {code:java}
> 24/05/04 21:41:01 ERROR config.DFSPropertiesConfiguration: Error reading in
> properties from dfs from file
> file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
> 24/05/04 21:41:01 INFO streamer.StreamSync: Shutting down embedded timeline
> server
> 24/05/04 21:41:01 ERROR streamer.HoodieMultiTableStreamer: error while
> running MultiTableDeltaStreamer for table: {TABLE}
> org.apache.hudi.exception.HoodieIOException: Cannot read properties from dfs
> from file
> file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
> at
> org.apache.hudi.common.config.DFSPropertiesConfiguration.addPropsFromFile(DFSPropertiesConfiguration.java:168)
> at
> org.apache.hudi.common.config.DFSPropertiesConfiguration.<init>(DFSPropertiesConfiguration.java:87)
> at
> org.apache.hudi.utilities.UtilHelpers.readConfig(UtilHelpers.java:258)
> at
> org.apache.hudi.utilities.streamer.HoodieStreamer$Config.getProps(HoodieStreamer.java:453)
> at
> org.apache.hudi.utilities.streamer.StreamSync.getDeducedSchemaProvider(StreamSync.java:714)
> at
> org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:676)
> at
> org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:568)
> at
> org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:540)
> at
> org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:444)
> at
> org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:874)
> at
> org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
> at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
> at
> org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:216)
> at
> org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:457)
> at
> org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:282)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.FileNotFoundException: File
> file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
> does not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
> at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
> at
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
> at
> org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:97)
> at org.apache.hudi.common.config.DFSPro
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)