xiearthur commented on issue #12661:
URL: https://github.com/apache/hudi/issues/12661#issuecomment-2599537734

   Yes, I’m sure.
   env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
   
   This is my current complete write code.
   
   ```
   
    public static DataStream<RowData> 
readHudiDataStream(StreamExecutionEnvironment env, ParameterTool params, 
Map<String, String> schema) throws Exception {
   
           String tableName = params.get("read_tablename");
           String basePath = params.get("read_basepath");
           String primaryKey = params.get("read_primarykey");
           String hoodieTableType = params.get("read_hoodie_table_type");
           String precombing = params.get("read_precombing");
   
           String name = HoodieTableType.COPY_ON_WRITE.name();
           if (hoodieTableType.equals("cow")) {
               name = HoodieTableType.COPY_ON_WRITE.name();
           }
           if (hoodieTableType.equals("mor")) {
               name = HoodieTableType.MERGE_ON_READ.name();
           }
           Map<String, String> options = new HashMap<>();
           options.put(FlinkOptions.PATH.key(), basePath+tableName);
           options.put(FlinkOptions.TABLE_TYPE.key(), name);
           options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
           options.put(FlinkOptions.PRECOMBINE_FIELD.key(),precombing);
           options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
           options.put("read.tasks","5");
   
           HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName);
           for (Map.Entry<String, String> entry : schema.entrySet()) {
               builder.column(entry.getKey() + " " + entry.getValue());
           }
           builder.pk(primaryKey)
                   .options(options);
   
           // Get DataStream of RowData
           DataStream<RowData> rowDataDS = builder.source(env);
   
           return rowDataDS;
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to