Kangho-Lee opened a new issue, #10483:
URL: https://github.com/apache/hudi/issues/10483

   Hello guys.
   this 
[post](https://hudi.apache.org/blog/2020/01/15/delete-support-in-hudi/#deletion-with-hoodiedeltastreamer)
 is from january 2020, any updates about deletion with deltastreamer? Is there 
a way to avoid having to add this field to incoming records for existing hudi 
table?
   I want hard delete an existing table rather than creating a new table with 
`_hoodie_is_deleted` field.
   
   When I tried to change the schema by adding the _hoodie_is_deleted field to 
the existing table by using transformer sql like
   ```
   hoodie.streamer.transformer.sql=SELECT *, if(__deleted = "false" or 
__deleted is null, cast(false as boolean), cast(true as boolean)) end AS 
_hoodie_is_deleted FROM <SRC> a
   ```
   an error occurred when merging the parquet files in the existing partition 
directory like
   ```
   Failed to merge old record into new file for key 5bfe8b1f77e686a37443cf91 
from old file xxxx to new file xxxx with writerSchema
   ...
   Null-value for required field: _hoodie_is_deleted
   ```
   
   I use hudi version 0.14.0 and conf is below (Delete personal/company 
information)
   ```
   ompaction.schedule.enabled: false
   hoodie.auto.adjust.lock.configs: true
   hoodie.bloom.index.parallelism: 100
   hoodie.bloom.index.update.partition.path: true
   hoodie.bulkinsert.shuffle.parallelism: 200
   hoodie.clean.automatic: false
   hoodie.datasource.hive_sync.database: test
   hoodie.datasource.hive_sync.metastore.uris: 
thrift://ranger-hms-oozie1.bdp.bdata.ai:9083,thrift://ranger-hms-oozie2.bdp.bdata.ai:9083
   hoodie.datasource.hive_sync.mode: hms
   hoodie.datasource.hive_sync.partition_extractor_class: 
org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.datasource.hive_sync.table: greeter_test
   hoodie.datasource.hive_sync.use_jdbc: false
   hoodie.datasource.meta.sync.enable: true
   hoodie.datasource.write.drop.partition.columns: false
   hoodie.datasource.write.keygenerator.class: 
org.apache.hudi.keygen.NonpartitionedKeyGenerator
   hoodie.datasource.write.partitionpath.field: 
   hoodie.datasource.write.precombine.field: __ordering_field
   hoodie.datasource.write.reconcile.schema: true
   hoodie.datasource.write.recordkey.field: user_oid
   hoodie.datasource.write.row.writer.enable: true
   hoodie.delete.shuffle.parallelism: 200
   hoodie.deltastreamer.ingestion.targetBasePath: 
   hoodie.index.type: GLOBAL_BLOOM
   hoodie.insert.shuffle.parallelism: 200
   hoodie.meta.sync.client.tool.class: org.apache.hudi.hive.HiveSyncTool
   hoodie.metadata.enable: true
   hoodie.parquet.compression.codec: snappy
   hoodie.parquet.small.file.limit: 134217728
   hoodie.streamer.schemaprovider.registry.schemaconverter: 
   hoodie.streamer.schemaprovider.registry.url: 
   hoodie.streamer.schemaprovider.spark_avro_post_processor.enable: false
   hoodie.streamer.source.dfs.root: 
   hoodie.streamer.transformer.sql: SELECT *, if(__deleted = "false" or 
__deleted is null, cast(false as boolean), cast(true as boolean)) AS 
_hoodie_is_deleted FROM <SRC> a
   hoodie.table.type: MERGE_ON_READ
   hoodie.upsert.shuffle.parallelism: 200
   ```
   
   spark-submit is below
   ```
   spark-submit \
   --deploy-mode cluster \
   --master yarn \
   --driver-cores 4 \
   --executor-cores 2 \
   --driver-memory 4G \
   --executor-memory 8G \
   --num-executors 5 \
   --conf spark.driver.memoryOverhead=1G \
   --conf spark.executor.memoryOverhead=1G \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
   --conf spark.shuffle.spill.compress=true \
   --conf spark.shuffle.compress=true \
   --conf spark.default.parallelism=200 \
   --conf spark.sql.shuffle.partitions=200 \
   --conf spark.rdd.compress=true \
   --conf spark.kryoserializer.buffer.max=512m \
   --conf spark.sql.hive.convertMetastoreParquet=false \
   --conf spark.driver.maxResultSize=8G \
   --conf spark.sql.debug.maxToStringFields=1000 \
   --conf spark.kerberos.access.hadoopFileSystems= \
   --conf spark.kerberos.keytab=test.keytab \
   --conf spark.kerberos.principal=test \
   --conf spark.eventLog.dir=hdfs://test \
   --conf spark.eventLog.enabled=true \
   --conf spark.eventLog.rolling.enabled=true \
   --conf spark.eventLog.rolling.maxFileSize=256m \
   --conf spark.history.fs.cleaner.enabled=true \
   --conf spark.history.fs.cleaner.interval=1d \
   --conf spark.history.fs.cleaner.maxAge=14d \
   --conf spark.history.fs.logDirectory=hdfs://test \
   --conf spark.history.retainedApplications=1000 \
   --conf spark.history.kerberos.enabled=true \
   --conf spark.history.kerberos.keytab=test.keytab \
   --conf spark.history.kerberos.principal=test \
   --conf spark.yarn.historyServer.address= \
   --conf 
spark.datasource.hive.warehouse.metastoreUri=thrift://ranger-hms-oozie1.bdp.bdata.ai:9083,thrift://ranger-hms-oozie2.bdp.bdata.ai:9083
 \
   --conf 'spark.driver.extraJavaOptions=-Dhdp.version=3.1.0.0-78 
-Dfile.encoding=UTF-8' \
   --conf 'spark.yarn.am.extraJavaOptions=-Dhdp.version=3.1.0.0-78 
-Dfile.encoding=UTF-8' \
   --conf spark.driver.extraClassPath=/etc/spark/conf/c3s \
   --conf spark.executor.extraClassPath=/etc/spark/conf/c3s \
   --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
   --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
   --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE= \
   --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE= \
   --conf 
spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/usr/hdp:/usr/hdp:ro
 \
   --conf 
spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/usr/hdp:/usr/hdp:ro \
   --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG= \
   --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG= \
   --conf spark.hadoop.dfs.client.ignore.namenode.default.kms.uri=true \
   --conf 
spark.hadoop.hive.metastore.uris=thrift://ranger-hms-oozie1.bdp.bdata.ai:9083,thrift://ranger-hms-oozie2.bdp.bdata.ai:9083
 \
   --conf spark.sql.catalogImplementation=hive \
   --conf 
'spark.sql.hive.hiveserver2.jdbc.url=jdbc:hive2://zk-etcd1.bdp.bdata.ai:2181,zk-etcd2.bdp.bdata.ai:2181,zk-etcd3.bdp.bdata.ai:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-batch'
 \
   --conf 
'spark.datasource.hive.warehouse.hs2.url.resolved=jdbc:hive2://zk-etcd1.bdp.bdata.ai:2181,zk-etcd2.bdp.bdata.ai:2181,zk-etcd3.bdp.bdata.ai:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-batch'
 \
   --conf spark.yarn.archive= \
   --conf spark.yarn.maxAppAttempts=1 \
   --conf spark.executorEnv.ENV=real \
   --conf spark.yarn.appMasterEnv.ENV=real \
   --conf spark.yarn.am.nodeLabelExpression= \
   --conf spark.yarn.executor.nodeLabelExpression= \
   --conf spark.executorEnv.ENV=real \
   --conf spark.yarn.appMasterEnv.ENV=real \
   --conf spark.executorEnv.SPARK_USER= \
   --conf spark.yarn.appMasterEnv.SPARK_USER= \
   --queue batch \
   --jars 
local:///home1/irteam/jars/hudi-spark3.3-bundle_2.12-0.14.0-zd.jar,local:///home1/irteam/jars/hudi-utilities-slim-bundle_2.12-0.14.0-zd.jar
 \
   --class hoodie.ingestion.DeltaStreamerWrapperJob \
   --hoodie-conf hoodie.upsert.shuffle.parallelism=200 \
   --hoodie-conf hoodie.insert.shuffle.parallelism=200 \
   --hoodie-conf hoodie.delete.shuffle.parallelism=200 \
   --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=200 \
   --hoodie-conf hoodie.clean.automatic=false \
   --hoodie-conf hoodie.datasource.write.precombine.field=__ordering_field \
   --hoodie-conf hoodie.streamer.source.dfs.root= \
   --hoodie-conf hoodie.streamer.schemaprovider.registry.schemaconverter= \
   --hoodie-conf compaction.schedule.enabled=false \
   --hoodie-conf hoodie.bloom.index.parallelism=100 \
   --props greeter_delete.properties \
   --table-type MERGE_ON_READ \
   --source-ordering-field \
   __ordering_field \
   --source-class org.apache.hudi.utilities.sources.AvroDFSSource \
   --target-base-path greeter_test/ \
   --target-table greeter_test \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
   --op UPSERT \
   --enable-sync \
   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
   --disable-compaction
   ```
   
   Using transformer sql above with creating a new table is works what I want.
   However, when using the above method on an existing hudi table, a merge 
error occurs.
   Please help me to solve.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to