rmnlchh commented on issue #9282:
URL: https://github.com/apache/hudi/issues/9282#issuecomment-1650228180
> @rmnlchh Just curious, Did you set these configs
>
> ```
> sc.set("spark.sql.legacy.parquet.nanosAsLong", "false");
> sc.set("spark.sql.parquet.binaryAsString", "false");
> sc.set("spark.sql.parquet.int96AsTimestamp", "true");
> sc.set("spark.sql.caseSensitive", "false");
> ```
>
> with your deltastreamer also? I will try to reproduce this issue .
Yes, adding all the DS configs
println(s"hoodieDeltaStreamerConfig=$hoodieDeltaStreamerConfig")
println(s"typedProperties=$typedProperties")
println("HERE JSC" + jsc.getConf.getAll.mkString)
val hoodieDeltaStreamer = new HoodieDeltaStreamer(hoodieDeltaStreamerConfig,
jsc
, FSUtils.getFs(hoodieDeltaStreamerConfig.targetBasePath, conf),
jsc.hadoopConfiguration
, org.apache.hudi.common.util.Option.of(typedProperties)
)
hoodieDeltaStreamerConfig=Config{targetBasePath='/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/',
targetTableName='published_creative', tableType='MERGE_ON_READ',
baseFileFormat='PARQUET',
propsFilePath='file://XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/src/test/resources/delta-streamer-config/dfs-source.properties',
configs=[],
sourceClassName='org.apache.hudi.utilities.sources.AvroKafkaSource',
sourceOrderingField='AssetValue',
payloadClassName='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload',
schemaProviderClassName='com.cardlytics.datapipeline.deltastreamer.schema.ResourceBasedSchemaProvider',
transformerClassNames=[org.apache.hudi.utilities.transform.SqlQueryBasedTransformer],
sourceLimit=9223372036854775807, operation=UPSERT, filterDupes=false,
enableHiveSync=false, enableMetaSync=false, forceEmptyMetaSync=false, syn
cClientToolClassNames=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool,
maxPendingCompactions=5, maxPendingClustering=5, continuousMode=false,
minSyncIntervalSeconds=0, sparkMaster='', commitOnErrors=false,
deltaSyncSchedulingWeight=1, compactSchedulingWeight=1,
clusterSchedulingWeight=1, deltaSyncSchedulingMinShare=0,
compactSchedulingMinShare=0, clusterSchedulingMinShare=0,
forceDisableCompaction=true, checkpoint='null',
initialCheckpointProvider='null', help=false}
typedProperties={spark.sql.avro.compression.codec=snappy,
hoodie.datasource.hive_sync.table=published_creative,
hoodie.datasource.hive_sync.partition_fields=Entity,
hoodie.metadata.index.column.stats.enable=false, hoodie.index.type=BLOOM,
hoodie.datasource.write.reconcile.schema=true,
hoodie.deltastreamer.schemaprovider.source.schema.file=domain/campaignbuild/schema/creative.avsc,
bootstrap.servers=PLAINTEXT://localhost:34873, hoodie.compact.inline=false,
hoodie.deltastreamer.transformer.sql=
SELECT
'Creative' Entity
,o.CreativeId
,o.PreMessageImpression
,o.PostMessageImpression
,o.Assets.Type AssetType
,o.Assets.Slot AssetSlot
,o.Assets.Label AssetLabel
,o.Assets.Value AssetValue
FROM
(SELECT a.CreativeId, a.PreMessageImpression, a.PostMessageImpression,
explode(a.Assets) Assets
FROM
<SRC> a) o
, hoodie.parquet.max.file.size=6291456,
hoodie.datasource.write.recordkey.field=CreativeId,AssetSlot,
hoodie.index.bloom.num_entries=60000,
hoodie.datasource.hive_sync.support_timestamp=true,
hoodie.metadata.enable=false, schema.registry.url=http://localhost:34874,
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator,
hoodie.datasource.write.table.type=MERGE_ON_READ,
hoodie.deltastreamer.source.kafka.topic=CMPN-CmpnPub-AdServer-Creative,
hoodie.datasource.write.hive_style_partitioning=true,
hoodie.metadata.insert.parallelism=1,
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false,
hoodie.parquet.compression.codec=snappy, spark.io.compression.codec=snappy,
hoodie.deltastreamer.schemaprovider.target.schema.file=domain/campaignbuild/schema/published_creative_table.json,
hoodie.bloom.index.prune.by.ranges=true,
hoodie.datasource.write.partitionpath.field=Entity,
hoodie.datasource.write.keygenerator.consistent.logical.time
stamp.enabled=true, hoodie.parquet.block.size=6291456,
hoodie.cleaner.fileversions.retained=2, hoodie.table.name=published_creative,
hoodie.upsert.shuffle.parallelism=4,
hoodie.meta.sync.client.tool.class=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool,
spark.sql.parquet.compression.codec=snappy,
hoodie.datasource.write.precombine.field=AssetValue,
hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload,
hoodie.datasource.meta.sync.base.path=/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/,
hoodie.datasource.hive_sync.database=datalake_ods_local,
hoodie.datasource.write.operation=upsert, auto.offset.reset=earliest,
hoodie.metadata.index.column.stats.parallelism=1,
hoodie.compact.inline.max.delta.commits=5}
HERE
JSC(spark.sql.hive.convertMetastoreParquet,false)(spark.driver.extraJavaOptions,-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED)(spark.sql.parquet.datetimeRebaseModeInRead,LEGACY)(spark.sql.legacy.timeParserPolicy,LEGACY)(spark.memory.fraction,0.50)(spark.sql.parquet.outputTimestampType,TIMESTAMP_MICROS)(spark.
driver.port,44679)(spark.sql.parquet.int96AsTimestamp,true)(spark.sql.parquet.binaryAsString,false)(spark.memory.storageFraction,0.20)(spark.serializer,org.apache.spark.serializer.KryoSerializer)(spark.sql.avro.datetimeRebaseModeInWrite,LEGACY)(spark.executor.memoryOverheadFactor,0.20)(spark.sql.parquet.compression.codec,snappy)(spark.master,local[*])(spark.sql.caseSensitive,false)(spark.app.name,CreativeDeltaStreamerTest-creative-deltastreamer-1690266341)(spark.app.id,local-1690280742465)(spark.sql.extension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension)(spark.sql.legacy.parquet.nanosAsLong,false)(spark.io.compression.codec,snappy)(spark.driver.host,127.0.0.1)(spark.io.compression.zstd.bufferSize,64k)(spark.shuffle.compress,true)(spark.sql.avro.datetimeRebaseModeInRead,LEGACY)(spark.sql.avro.compression.codec,snappy)(spark.app.startTime,1690280741849)(spark.executor.id,driver)(spark.sql.parquet.int96RebaseModeInRead,LEGACY)(spark.shuffle.spill.compress,true)(spark.rdd.compr
ess,true)(spark.sql.adaptive.skewJoin.enabled,true)(spark.broadcast.compress,true)(spark.sql.parquet.datetimeRebaseModeInWrite,LEGACY)(spark.io.compression.zstd.level,1)(spark.driver.memoryOverheadFactor,0.15)(spark.sql.catalog,org.apache.spark.sql.hudi.catalog.HoodieCatalog)(spark.executor.extraJavaOptions,-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.securi
ty.jgss/sun.security.krb5=ALL-UNNAMED)(spark.executor.cores,1)(spark.io.compression.lz4.blockSize,64k)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]