tsreaper commented on code in PR #430:
URL: https://github.com/apache/flink-table-store/pull/430#discussion_r1044204129
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java:
##########
@@ -36,4 +44,24 @@ public interface SnapshotEnumerator {
*/
@Nullable
DataTableScan.DataFilePlan enumerate();
+
+ static void validateContinuous(TableSchema schema) {
+ CoreOptions options = new CoreOptions(schema.options());
+ MergeEngine mergeEngine = options.mergeEngine();
+ HashMap<MergeEngine, String> mergeEngineDesc =
+ new HashMap<MergeEngine, String>() {
+ {
+ put(MergeEngine.PARTIAL_UPDATE, "Partial update");
+ put(MergeEngine.AGGREGATE, "Pre-aggregate");
+ }
+ };
+ if (schema.primaryKeys().size() > 0
+ && mergeEngineDesc.containsKey(mergeEngine)
+ && options.changelogProducer() != FULL_COMPACTION) {
+ throw new ValidationException(
+ mergeEngineDesc.get(mergeEngine)
+ + " continuous reading is not supported. "
+ + "You can use full compaction changelog producer
to support streaming reading.");
+ }
+ }
Review Comment:
Why not move this to `ContinuousDataFileSnapshotEnumerator` and rename it as
`validate`? This validation is only about streaming reads of data files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]