[ 
https://issues.apache.org/jira/browse/HUDI-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jihwan Lee updated HUDI-7711:
-----------------------------
    Description: 
HudiMultiTableStreamer initializes common configs, then deepcopy related fields 
into each streams.

Because _propsFilePath_ on each streamer is not handled, they always retrieve 
path of test files as default value.

 

Also, if runs MultiTableStreamer with {_}--hoodie-conf{_}, each streamer should 
be able to have these configs. (such like inheritance)

 

MultiTable configs (kafka-source.properties):

 
{code:java}
...
hoodie.streamer.ingestion.tablesToBeIngested=db.tbl1,db.tb2
hoodie.streamer.ingestion.db.tbl1.configFile=hdfs:///tmp/config_1.properties
hoodie.streamer.ingestion.db.tbl2.configFile=hdfs:///tmp/config_2.properties
... {code}
 

 

/tmp/config_1.properties:

 
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic1
... {code}
 

 

/tmp/config_2.properties:
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic2
... {code}
 

error log (workspace is replaced to \{RUNNING_PATH}) :

 
{code:java}
24/05/04 21:41:01 ERROR config.DFSPropertiesConfiguration: Error reading in 
properties from dfs from file 
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
24/05/04 21:41:01 INFO streamer.StreamSync: Shutting down embedded timeline 
server
24/05/04 21:41:01 ERROR streamer.HoodieMultiTableStreamer: error while running 
MultiTableDeltaStreamer for table: {TABLE}
org.apache.hudi.exception.HoodieIOException: Cannot read properties from dfs 
from file 
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
        at 
org.apache.hudi.common.config.DFSPropertiesConfiguration.addPropsFromFile(DFSPropertiesConfiguration.java:168)
        at 
org.apache.hudi.common.config.DFSPropertiesConfiguration.<init>(DFSPropertiesConfiguration.java:87)
        at 
org.apache.hudi.utilities.UtilHelpers.readConfig(UtilHelpers.java:258)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$Config.getProps(HoodieStreamer.java:453)
        at 
org.apache.hudi.utilities.streamer.StreamSync.getDeducedSchemaProvider(StreamSync.java:714)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:676)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:568)
        at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:540)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:444)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:874)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:216)
        at 
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:457)
        at 
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:282)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: File 
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties 
does not exist
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
        at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
        at 
org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:97)
        at org.apache.hudi.common.config.DFSPro

{code}
 

 

  was:
HudiMultiTableStreamer initializes common configs, then deepcopy related fields 
into each streams.

Because _propsFilePath_ on each streamer is not handled, they always retrieve 
path of test files as default value.

 

Also, if runs MultiTableStreamer with {_}--hoodie-conf{_}, each streamer should 
be able to have these configs. (such like inheritance)

 

MultiTable configs (kafka-source.properties):

 
{code:java}
...
hoodie.streamer.ingestion.tablesToBeIngested=db.tbl1,db.tb2
hoodie.streamer.ingestion.db.tbl1.configFile=hdfs:///tmp/config_1.properties
hoodie.streamer.ingestion.db.tbl2.configFile=hdfs:///tmp/config_2.properties
... {code}
 

 

/tmp/config_1.properties:

 
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic1
... {code}
 

 

/tmp/config_2.properties:
{code:java}
...
hoodie.datasource.write.recordkey.field=id
hoodie.streamer.source.kafka.topic=topic2
... {code}
 

error log (workspace is replaced to \{RUNNING_PATH}) :

 
{code:java}
24/05/04 21:41:01 ERROR config.DFSPropertiesConfiguration: Error reading in 
properties from dfs from file 
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
24/05/04 21:41:01 INFO streamer.StreamSync: Shutting down embedded timeline 
server
24/05/04 21:41:01 ERROR streamer.HoodieMultiTableStreamer: error while running 
MultiTableDeltaStreamer for table: review_processed_data
org.apache.hudi.exception.HoodieIOException: Cannot read properties from dfs 
from file 
file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
        at 
org.apache.hudi.common.config.DFSPropertiesConfiguration.addPropsFromFile(DFSPropertiesConfiguration.java:168)
        at 
org.apache.hudi.common.config.DFSPropertiesConfiguration.<init>(DFSPropertiesConfiguration.java:87)
        at 
org.apache.hudi.utilities.UtilHelpers.readConfig(UtilHelpers.java:258)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$Config.getProps(HoodieStreamer.java:453)
        at 
org.apache.hudi.utilities.streamer.StreamSync.getDeducedSchemaProvider(StreamSync.java:714)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:676)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:568)
        at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:540)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:444)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:874)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:216)
        at 
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:457)
        at 
org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:282)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: File 
file:/home1/irteam/user/jihwan/hudi-util/multi_review/src/test/resources/streamer-config/dfs-source.properties
 does not exist
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
        at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
        at 
org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:97)
        at org.apache.hudi.common.config.DFSPro

{code}
 

 


> Fix MultiTableStreamer can deal with path of properties file for each streamer
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-7711
>                 URL: https://issues.apache.org/jira/browse/HUDI-7711
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: hudi-utilities
>         Environment: hudi0.14.1, Spark3.2
>            Reporter: Jihwan Lee
>            Priority: Major
>              Labels: pull-request-available
>
> HudiMultiTableStreamer initializes common configs, then deepcopy related 
> fields into each streams.
> Because _propsFilePath_ on each streamer is not handled, they always retrieve 
> path of test files as default value.
>  
> Also, if runs MultiTableStreamer with {_}--hoodie-conf{_}, each streamer 
> should be able to have these configs. (such like inheritance)
>  
> MultiTable configs (kafka-source.properties):
>  
> {code:java}
> ...
> hoodie.streamer.ingestion.tablesToBeIngested=db.tbl1,db.tb2
> hoodie.streamer.ingestion.db.tbl1.configFile=hdfs:///tmp/config_1.properties
> hoodie.streamer.ingestion.db.tbl2.configFile=hdfs:///tmp/config_2.properties
> ... {code}
>  
>  
> /tmp/config_1.properties:
>  
> {code:java}
> ...
> hoodie.datasource.write.recordkey.field=id
> hoodie.streamer.source.kafka.topic=topic1
> ... {code}
>  
>  
> /tmp/config_2.properties:
> {code:java}
> ...
> hoodie.datasource.write.recordkey.field=id
> hoodie.streamer.source.kafka.topic=topic2
> ... {code}
>  
> error log (workspace is replaced to \{RUNNING_PATH}) :
>  
> {code:java}
> 24/05/04 21:41:01 ERROR config.DFSPropertiesConfiguration: Error reading in 
> properties from dfs from file 
> file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
> 24/05/04 21:41:01 INFO streamer.StreamSync: Shutting down embedded timeline 
> server
> 24/05/04 21:41:01 ERROR streamer.HoodieMultiTableStreamer: error while 
> running MultiTableDeltaStreamer for table: {TABLE}
> org.apache.hudi.exception.HoodieIOException: Cannot read properties from dfs 
> from file 
> file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties
>         at 
> org.apache.hudi.common.config.DFSPropertiesConfiguration.addPropsFromFile(DFSPropertiesConfiguration.java:168)
>         at 
> org.apache.hudi.common.config.DFSPropertiesConfiguration.<init>(DFSPropertiesConfiguration.java:87)
>         at 
> org.apache.hudi.utilities.UtilHelpers.readConfig(UtilHelpers.java:258)
>         at 
> org.apache.hudi.utilities.streamer.HoodieStreamer$Config.getProps(HoodieStreamer.java:453)
>         at 
> org.apache.hudi.utilities.streamer.StreamSync.getDeducedSchemaProvider(StreamSync.java:714)
>         at 
> org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:676)
>         at 
> org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:568)
>         at 
> org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:540)
>         at 
> org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:444)
>         at 
> org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:874)
>         at 
> org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
>         at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
>         at 
> org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:216)
>         at 
> org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:457)
>         at 
> org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:282)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>         at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
>         at 
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>         at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>         at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>         at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.FileNotFoundException: File 
> file:{RUNNING_PATH}/src/test/resources/streamer-config/dfs-source.properties 
> does not exist
>         at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
>         at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
>         at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
>         at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
>         at 
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
>         at 
> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
>         at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
>         at 
> org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:97)
>         at org.apache.hudi.common.config.DFSPro
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to