geonyeongkim commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1469934370

   Hello.
   I looked at HoodieFlinkStreamer in github and used 
JsonRowDataDeserializationSchema to troubleshoot SortOperator.
   
   I have a few questions about it.
   
   ## 1. BULK_INSERT 
   If the operation is set to BULK_INSERT, there will be no error.
   
   However, it only consumes kafka messages and does not actually create 
parquet files in hdfs.
   
   My code simply writes kafka messages to the hudi table on the hdfs.
   
   ```kotlin
   @JvmStatic
   fun main(args: Array<String>) {
       val env = StreamExecutionEnvironment.getExecutionEnvironment()
       env.enableCheckpointing(5000)
   
       val props = Configuration()
       props.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, "avro schema")
   
       val rowType = 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(props)).logicalType
 as RowType
       
       val kafkaSource = KafkaSource.builder<RowData>()
           .setBootstrapServers(bootstrapServers)
           .setTopics(topic)
           .setGroupId(SampleHudiApp::class.java.name)
           .setClientIdPrefix(UUID.randomUUID().toString())
           
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
           .setDeserializer(CustomJsonRowDataDeserializationSchema(
               rowType,
               InternalTypeInfo.of(rowType),
               false,
               true,
               TimestampFormat.ISO_8601
           ))
           .build()
       HoodiePipeline.builder("hudi_test_table")
           .column("id BIGINT")
           .column("name STRING")
           .column("`partition_path` STRING")
           .column("ts BIGINT")
           .column("dc STRING")
           .column("op STRING")
           .pk("id")
           .partition("partition_path")
           .options(mapOf(
               FlinkOptions.PATH.key() to 
"hdfs:///user/geonyeong.kim/hudi_flink_test",
               FlinkOptions.TABLE_TYPE.key() to 
HoodieTableType.COPY_ON_WRITE.name,
               FlinkOptions.INDEX_GLOBAL_ENABLED.key() to "false"
           ))
           .sink(env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"hudi_source"), true)
       env.execute("HUDI STREAM SINK")
   }
   ```
   
   **If the WRITE_BULK_INSERT_SORT_INPUT setting is false, it works normally.**
   
   **Is this a bug? Or can WRITE_BULK_INSERT_SORT_INPUT work even if this 
setting is true?**
   
   ## 2. BULK_INSERT vs APPEND
   
   I viewed org.apache.hudi.sink.utils.Pipelines class.
   And i confirmed that BulkInsertWriteFunction class is used for bulk_insert 
mode and AppendWriteFunction class is used for append mode.
   
   However, if the index type is not Bucket in BulkInsertWriterFunction, 
BulkInsertWriterHelper is used.
   
   AppendWriteFunction also uses BulkInsertWriterHelper.
   **Then, if the index type is a FLINK_STATE, will the behavior of the two be 
the same?**
   
   - 
[WriterHelpers](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java#L35)
   
   - 
[AppendWriteFunction](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java#L115)
   
   ## 3. Compress
   
   I want to use Flink to apply Compress to the COW table when Parquet Write.
   
   In Flink, Hudi Write created HoodieFlinkWriteClient in FlinkWriteClients 
based on the FlinkOptions value and confirmed that each WriteFunction uses it.
   
   So I overrided the 
[FlinkWriteClients](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java#L208)
    class and added the parquetCompressionCodec("gzip") setting.
   
   <img width="898" alt="image" 
src="https://user-images.githubusercontent.com/31622350/225310483-488540f4-9055-4d7d-838d-6ba6d52e74c1.png";>
   
   **However, compress was not applied.**
   **Is this not applicable in Flink?**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to