tuziling opened a new issue #3129:
URL: https://github.com/apache/iceberg/issues/3129
@Slf4j
public class AppendingData {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
log.info("AppendingData 程序开始 ......");
//读取kafka的数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","10.0.0.53:9092");
properties.setProperty("zookeeper.connect","10.0.0.53:2181");
properties.setProperty("group.id","test-1451");
String topicName = "test";
FlinkKafkaConsumer<String> kafkaConsumer = new
FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), properties);
kafkaConsumer.setStartFromLatest();
DataStream<Tuple2<Long, String>> kafkaData =
env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<String, Tuple2<Long,
String>>() {
@Override
public void flatMap(String value, Collector<Tuple2<Long,
String>> out) throws Exception {
if (value != null && value.length() > 0) {
String[] arr = value.split(" ");
for (int i = 0; i < arr.length; i++) {
if (arr[i] != null && arr[i].length() > 0) {
out.collect(new Tuple2<>(new Long((long)
arr[i].length()), arr[i]));
}
}
}
}
});
DataStream<RowData> input = kafkaData.map(new
MapFunction<Tuple2<Long, String>, RowData>() {
@Override
public RowData map(Tuple2<Long, String> value) throws Exception {
GenericRowData genericRow = new GenericRowData(2);
genericRow.setField(0,value.f0);
genericRow.setField(1, StringData.fromString(value.f1));
RowData rowData = genericRow;
log.info("map function --> rowData={}",rowData);
return rowData;
}
});
String catalogName = "songfuhao_catalog";
Configuration hadoopConf = new Configuration();
Map<String, String> props = new HashMap<>();
props.put("warehouse", "s3://songfuhao-bucket/songfuhao");
props.put("catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog");
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
props.put("lock-impl",
"org.apache.iceberg.aws.glue.DynamoLockManager");
props.put("lock.table", "myGlueLockTable");
props.put("AWS_ACCESS_KEY_ID", "AKIAYBGQUCQ7MFKJTKUV");
props.put("AWS_SECRET_ACCESS_KEY",
"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");
props.put("AWS_DEFAULT_REGION", "cn-northwest-1");
String impl = "org.apache.iceberg.aws.glue.GlueCatalog";
CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, props,
hadoopConf, impl);
TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog,
TableIdentifier.of("songfh", "sfuhao_test"));
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
// .overwrite(true)
.build();
env.execute("Test Iceberg DataStream");
}
}
The data in Kafka can be consumed, that is, the data is not finally entered
into S3, and there is no error when running. Does anyone know what happened?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]