JingsongLi commented on pull request #1185:
URL: https://github.com/apache/iceberg/pull/1185#issuecomment-678073729
Hi @openinx , for the API level, I think we can do more:
- We can provide two API, one for RowData, one for Row, This can cover not
only most users of `DataStream`, but also the requirements of SQL layer.
- Users can just construct a sink from `DataStream` and `TableLoader`, other
information can be inferred.
Just like:
```
public class FlinkSink {
private FlinkSink() {
}
public static Builder<Row> forRow(DataStream<Row> input) {
return new Builder<>(input);
}
public static Builder<RowData> forRowData(DataStream<RowData> input) {
return new Builder<>(input);
}
private static class Builder<T> {
private final DataStream<T> input;
private TableLoader loader;
private Configuration hadoopConf;
private Table table;
private TableSchema tableSchema;
// ---------- Required options ------------
private Builder(DataStream<T> input) {
this.input = input;
}
public Builder tableLoader(TableLoader loader) {
this.loader = loader;
return this;
}
// ---------- Optional options ------------
public Builder table(Table newTable) {
this.table = newTable;
return this;
}
public Builder hadoopConf(Configuration newConf) {
this.hadoopConf = newConf;
return this;
}
public Builder tableSchema(TableSchema newSchema) {
this.tableSchema = newSchema;
return this;
}
@SuppressWarnings("unchecked")
public DataStreamSink<RowData> build() {
Preconditions.checkNotNull(input, "Input data stream shouldn't be
null");
Preconditions.checkNotNull(loader, "Table loader shouldn't be null");
if (hadoopConf == null) {
// load cluster conf
}
if (table == null) {
// load table from table loader
}
// tableSchema can be optional
DataStream<RowData> inputStream;
Class<T> inputClass = input.getType().getTypeClass();
if (inputClass == Row.class) {
DataType type;
if (tableSchema != null) {
type = tableSchema.toRowDataType();
} else {
type =
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(table.schema()));
}
DataStructureConverter converter =
DataStructureConverters.getConverter(type);
inputStream = input.map((MapFunction) converter::toInternal);
} else if (inputClass == RowData.class) {
inputStream = (DataStream<RowData>) input;
} else {
throw new IllegalArgumentException("Should be Row or RowData");
}
// create writer form inputStream.
// create committer.
// return DataStreamSink.
return null;
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]