yesemsanthoshkumar opened a new issue #4784:
URL: https://github.com/apache/hudi/issues/4784


   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   I have a dataset that has a schema that looks like
   ```
   {
    "created_at": "2022-01-30T00:24:14Z",
    "city_id": "1",
    "__source_ts_ms": 123456789
    "other columns"....
   }
   ```
   This is created by Debezium CDC with NewRecordStateExtract Transform. I ran 
the Deltastreamer in continuous mode with CustomKeyGenerator. I could see the 
parquet files created with partitions specified. But when reading the hudi 
datasource via spark, I couldn't find the partition column in the dataframe 
when reading from hudi spark.
   ```
   spark-submit \
     --deploy-mode cluster \
     --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
     --conf "spark.sql.hive.convertMetastoreParquet=false" \
     --jars 
"/usr/lib/hudi/hudi-utilities-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar"
 \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     /usr/lib/hudi/hudi-utilities-bundle.jar \
     --table-type MERGE_ON_READ \
     --target-base-path <<s3://bucket/tablenameprefix>> \
     --target-table <<tablename>> \
     --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
     --source-ordering-field __source_ts_ms  \
     --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
     --op UPSERT \
     --hoodie-conf "group.id=<<consumergroupid>>" \
     --hoodie-conf 
"hoodie.deltastreamer.source.kafka.topic=<<kafka-topic-name>>" \
     --hoodie-conf "hoodie.embed.timeline.server=true" \
     --hoodie-conf "hoodie.compact.inline=false" \
     --hoodie-conf "hoodie.datasource.write.drop.partition.columns=false" \
     --hoodie-conf 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator"
 \
     --hoodie-conf "hoodie.datasource.write.recordkey.field=id" \
     --hoodie-conf 
"hoodie.datasource.write.partitionpath.field=created_at:timestamp,city_id:simple"
 \
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING" \
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd'T'HH:mm:ssZ" 
\
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy-MM-dd" \
     --hoodie-conf "hoodie.deltastreamer.keygen.timebased.timezone=GMT+5:30" \
     --hoodie-conf "hoodie.datasource.write.hive_style_partitioning=true" \
     --hoodie-conf 
"hoodie.deltastreamer.schemaprovider.registry.url=https://myschemaregistry/mysubject/versions/1";
 \
     --hoodie-conf 
"hoodie.deltastreamer.source.dfs.root=<<s3://bucket/anotherprefix>>" \
     --hoodie-conf "auto.offset.reset=earliest" \
     --hoodie-conf "hoodie.upsert.shuffle.parallelism=2" \
     --hoodie-conf "hoodie.insert.shuffle.parallelism=2" \
     --hoodie-conf "hoodie.delete.shuffle.parallelism=2" \
     --hoodie-conf "hoodie.bulkinsert.shuffle.parallelism=2" \
     --hoodie-conf "schema.registry.url=https://myschemaregistry/"; \
     --hoodie-conf "bootstrap.servers=mykafkabroker1:9092" \
     --continuous
   ```
   
   A clear and concise description of the problem.
   
   I tried to read the dataset by the following
   ```
   spark.read.format("org.apache.hudi").load("s3path")
   ```
   gives me
   ```
   id     | created_at                       | city_id | 
_hoodie_partition_path.   |  _hoodie_record_key
   123  | 2022-01-30T00:24:14Z | 1          | created_at=2021-10-30   |    123
   ```
   What I expected
   ```
   id    | created_at                       | city_id | _hoodie_partition_path  
                | _hoodie_record_key | created_at
   123 | 2022-01-30T00:24:14Z | 1          | created_at=2021-10-30/city_id=1 | 
123                              | 2021-10-30
   ```
   Maybe not the same created_at but something like **_created_at**
   
   ```
   spark
      .read
      .option("basepath", "s3://path")
      .parquet("s3://path/created_at=2022-01*/")
   ```
   and
   ```
   spark
      .read
      .option("basepath", "s3://path")
      .parquet("s3://path/created_at=2022-01*/city_id=1/")
   ```
   gives me
   
   ```
   id  | created_at    | city_id |  _hoodie_partition_path                     
|  _hoodie_record_key
   1   | 2022-01-30  | 1          |  created_at=2022-01-30/hub_id=5   |  1
   ```
   Notice now there is no timestamp data in created_at
   
   My questions are as follows:
   1. Is there a way to get both partition and actual column in the dataframe? 
In this case, I'd like to get the date value and timestamp value in two columns.
   2. Or rename the partition column in hudi? My partition looks like 
created_at=yyyy-MM-dd/city_id=1/  Something like dt=yyyy-MM-dd/city_id=1/
   3. If this is an expected behaviour, how do I apply filtering with just 
dates on created_at?
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a mysql table with the following DDL
   
   ```
   CREATE TABLE `orders` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `city_id` varchar(5) COLLATE utf8_unicode_ci NOT NULL,
     `created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
     `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
     PRIMARY KEY (`id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
   ```
   
   2. Create a debezium connector with the following configuration
   ```
   {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "tasks.max": "1",
        "database.history.kafka.topic": "hudi-history",
        "transforms": "unwrap",
        "tombstones.on.delete": "false",
        "snapshot.new.tables": "parallel",
        "database.history.skip.unparseable.ddl": "true",
        "transforms.unwrap.type": 
"io.debezium.transforms.ExtractNewRecordState",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "database.user": "username",
        "database.history.kafka.bootstrap.servers": "kafkabrokerhost:port",
        "database.server.name": "hudi-dev",
        "heartbeat.interval.ms": "30000",
        "database.port": "3306",
        "key.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "schemaregistryhost:port",
        "database.hostname": "dbhostnameOrIP",
        "database.password": "password",
        "value.converter.schemas.enable": "true",
        "name": "hudi-dev2",
        "transforms.unwrap.add.fields": "op,db,table,source.ts_ms,ts_ms",
        "table.include.list": "mydb.orders",
        "key.converter.schema.registry.url": "schemaregistryhost:port",
        "snapshot.mode": "initial"
   }
   ```
   
   3. Run hudi delta streamer with the above configurations
   4. Read the dataset with the above methods suggested
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   I expected the partition columns to be present inside the dataframe as well 
so that I can apply filtering.
   
   **Environment Description**
   
   * Hudi version : 0.7.0-amzn-0
   
   * Spark version : 3.1.1-amzn-0.1
   
   * Hive version : 3.1.2-amzn-4
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : s3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to