tibrewalpratik17 commented on code in PR #13837:
URL: https://github.com/apache/pinot/pull/13837#discussion_r1721623358
##########
pinot-connectors/pinot-flink-connector/README.md:
##########
@@ -39,9 +44,31 @@ srcDs.addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO
execEnv.execute();
```
+## Quick start for realtime(upsert) table backfill
+```java
+// Set up flink env and data source
+StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.setParallelism(2); // mandatory for upsert tables wi
+DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
+
+// Create a ControllerRequestClient to fetch Pinot schema and table config
+
Review Comment:
nit: let's add these lines here as well for completeness
```
HttpClient httpClient = HttpClient.getInstance();
ControllerRequestClient client = new ControllerRequestClient(
ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
```
##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -124,12 +132,26 @@ public void init(TableConfig tableConfig, Schema schema,
Map<String, String> bat
"batchConfigMaps must contain only 1 BatchConfig for table: %s",
_tableNameWithType);
Map<String, String> batchConfigMap =
_batchIngestionConfig.getBatchConfigMaps().get(0);
+ batchConfigMap.put(BatchConfigProperties.PARTITION_ID,
Integer.toString(_indexOfSubtask));
+ batchConfigMap.put(BatchConfigProperties.SEQUENCE_ID,
Integer.toString(_seqId));
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_CREATION_TIME_MS,
String.valueOf(_segmentCreationTimeMs));
+ batchConfigMap.put(
+ BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." +
BatchConfigProperties.SEGMENT_NAME_PREFIX,
+ _segmentNamePrefix);
+
+ // generate segment name for simple segment name generator type(non upsert
tables)
String segmentNamePostfixProp = String.format("%s.%s",
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
BatchConfigProperties.SEGMENT_NAME_POSTFIX);
String segmentSuffix = batchConfigMap.get(segmentNamePostfixProp);
segmentSuffix = segmentSuffix == null ? String.valueOf(_indexOfSubtask) :
segmentSuffix + "_" + _indexOfSubtask;
batchConfigMap.put(segmentNamePostfixProp, segmentSuffix);
+ // For upsert tables must use the UploadedRealtimeSegmentName for right
assignment of segments
+ if (_tableConfig.isUpsertEnabled()) {
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME);
+ }
+
Review Comment:
Not introduced in this PR: Since right assignment of segments is strictly
required in upsert tables and not other non-upsert realtime tables, do you
think SegmentNameGeneratorType.UPLOADED_UPSERT would have made more sense?
##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -60,13 +63,17 @@ public class PinotSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
private TableConfig _tableConfig;
private Schema _schema;
+ private String _segmentNamePrefix;
+ @Nullable private Long _overriddenSegmentCreationTimeMs;
Review Comment:
let's keep the class variable name as `segmentCreationTimeMs` here as having
`overridden` may be confusing given we are not overriding any other value in
this class with this variable.
##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -49,6 +50,8 @@ public class PinotSinkFunction<T> extends RichSinkFunction<T>
implements Checkpo
public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
+ public static final String DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX =
"flink";
Review Comment:
should we keep the default prefix as "uploaded_flink" for more clarity?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]