tibrewalpratik17 commented on code in PR #13837:
URL: https://github.com/apache/pinot/pull/13837#discussion_r1721623358


##########
pinot-connectors/pinot-flink-connector/README.md:
##########
@@ -39,9 +44,31 @@ srcDs.addSink(new PinotSinkFunction<>(new 
PinotRowRecordConverter(TEST_TYPE_INFO
 execEnv.execute();
 ```
 
+## Quick start for realtime(upsert) table backfill
+```java
+// Set up flink env and data source
+StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.setParallelism(2); // mandatory for upsert tables wi
+DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
+
+// Create a ControllerRequestClient to fetch Pinot schema and table config
+

Review Comment:
   nit: let's add these lines here as well for completeness
   ```
   HttpClient httpClient = HttpClient.getInstance();
   ControllerRequestClient client = new ControllerRequestClient(
   ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
   ```



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -124,12 +132,26 @@ public void init(TableConfig tableConfig, Schema schema, 
Map<String, String> bat
         "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
 
     Map<String, String> batchConfigMap = 
_batchIngestionConfig.getBatchConfigMaps().get(0);
+    batchConfigMap.put(BatchConfigProperties.PARTITION_ID, 
Integer.toString(_indexOfSubtask));
+    batchConfigMap.put(BatchConfigProperties.SEQUENCE_ID, 
Integer.toString(_seqId));
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_CREATION_TIME_MS, 
String.valueOf(_segmentCreationTimeMs));
+    batchConfigMap.put(
+        BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + 
BatchConfigProperties.SEGMENT_NAME_PREFIX,
+        _segmentNamePrefix);
+
+    // generate segment name for simple segment name generator type(non upsert 
tables)
     String segmentNamePostfixProp = String.format("%s.%s", 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
         BatchConfigProperties.SEGMENT_NAME_POSTFIX);
     String segmentSuffix = batchConfigMap.get(segmentNamePostfixProp);
     segmentSuffix = segmentSuffix == null ? String.valueOf(_indexOfSubtask) : 
segmentSuffix + "_" + _indexOfSubtask;
     batchConfigMap.put(segmentNamePostfixProp, segmentSuffix);
 
+    // For upsert tables must use the UploadedRealtimeSegmentName for right 
assignment of segments
+    if (_tableConfig.isUpsertEnabled()) {
+      batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+          BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME);
+    }
+

Review Comment:
   Not introduced in this PR: Since right assignment of segments is strictly 
required in upsert tables and not other non-upsert realtime tables, do you 
think SegmentNameGeneratorType.UPLOADED_UPSERT would have made more sense? 



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -60,13 +63,17 @@ public class PinotSinkFunction<T> extends 
RichSinkFunction<T> implements Checkpo
   private TableConfig _tableConfig;
   private Schema _schema;
 
+  private String _segmentNamePrefix;
+  @Nullable private Long _overriddenSegmentCreationTimeMs;

Review Comment:
   let's keep the class variable name as `segmentCreationTimeMs` here as having 
`overridden` may be confusing given we are not overriding any other value in 
this class with this variable.



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -49,6 +50,8 @@ public class PinotSinkFunction<T> extends RichSinkFunction<T> 
implements Checkpo
   public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
   public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
 
+  public static final String DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX = 
"flink";

Review Comment:
   should we keep the default prefix as "uploaded_flink" for more clarity? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to