waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011376344
Thank @danny0405
I am able to map the arraytype using below but not sure if I am doing it
correctly
private static HoodiePipeline.Builder createHudiPipeline(String targetTable,
Map<String, String> options) {
return HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(256)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts TIMESTAMP(3)")
.column("data ARRAY<ROW<measure_name VARCHAR(256), type
VARCHAR(10)>>")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
}
// Define the schema for Hudi records
public static final DataType ROW_ARRAY_DATA_TYPE =
ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256)), // record key
DataTypes.FIELD("type", DataTypes.VARCHAR(10))).notNull();
public static final DataType ROW_DATA_TYPE = ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), //
record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)),
DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE))
)
.notNull();
Now I am trying to figure out how to map it to rowdata where I
tried like
static class HudiDataSource implements MapFunction<Telemetry,
RowData> {
@Override
public RowData map(Telemetry kafkaRecord) throws Exception {
return
insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1),
StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
StringData.fromString("Danny"))
);
}
}
but its falling with
static class HudiDataSource implements MapFunction<Telemetry, RowData> {
@Override
public RowData map(Telemetry kafkaRecord) throws Exception {
return
insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1),
StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
StringData.fromString("Danny"))
);
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]