YannByron commented on code in PR #2532:
URL: https://github.com/apache/fluss/pull/2532#discussion_r2759273946


##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala:
##########
@@ -36,6 +37,38 @@ trait FlussScan extends Scan {
   }
 }
 
+object FlussScan {

Review Comment:
   Move these to `FlussBatch` or other common place for `MicroBatchStream`. 
`OffsetsInitializer` affects how to plan (which is controlled in `FlussBatch`), 
not how to generate `Batch` and `MicroBatchStream`. And keep the original 
definition of `FlussXXXScan`.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala:
##########
@@ -17,23 +17,37 @@
 
 package org.apache.fluss.spark
 
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
 import org.apache.fluss.config.ConfigBuilder.key
-import org.apache.fluss.config.ConfigOption
 
-import org.apache.spark.sql.internal.SQLConf.buildConf
+import java.time.Duration
 
 object SparkFlussConf {
 
-  val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
-    .internal()
-    .doc("If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
-    .booleanConf
-    .createWithDefault(false)
+  val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."
 
   val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
-    key(READ_OPTIMIZED.key)
+    key("read.optimized")
       .booleanType()
-      .defaultValue(READ_OPTIMIZED.defaultValue.get)
-      .withDescription(READ_OPTIMIZED.doc)
+      .defaultValue(false)
+      .withDescription(
+        "If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
 
+  object StartUpMode extends Enumeration {
+    val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+  }
+
+  val SCAN_START_UP_MODE: ConfigOption[String] =

Review Comment:
   I suggest to place these common options used both spark and flink in 
`ConfigOptions`, like 
https://github.com/apache/paimon/blob/a10a44892cd5e9dbac705762ed6774674357692f/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java#L947
 in paimon. We can do this in separate pr maybe.



##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala:
##########
@@ -38,6 +38,14 @@ class SparkPrimaryKeyTableReadTest extends 
FlussSparkTestBase {
     new Configuration()
   }
 
+  override protected def beforeEach(): Unit = {

Review Comment:
   All the configs are default values, this's meaningless,  so I suggest to 
remove this.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala:
##########
@@ -17,23 +17,37 @@
 
 package org.apache.fluss.spark
 
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
 import org.apache.fluss.config.ConfigBuilder.key
-import org.apache.fluss.config.ConfigOption
 
-import org.apache.spark.sql.internal.SQLConf.buildConf
+import java.time.Duration
 
 object SparkFlussConf {
 
-  val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
-    .internal()
-    .doc("If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
-    .booleanConf
-    .createWithDefault(false)
+  val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."
 
   val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
-    key(READ_OPTIMIZED.key)
+    key("read.optimized")
       .booleanType()
-      .defaultValue(READ_OPTIMIZED.defaultValue.get)
-      .withDescription(READ_OPTIMIZED.doc)
+      .defaultValue(false)
+      .withDescription(
+        "If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
 
+  object StartUpMode extends Enumeration {
+    val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+  }
+
+  val SCAN_START_UP_MODE: ConfigOption[String] =
+    ConfigBuilder
+      .key("scan.startup.mode")
+      .stringType()
+      .defaultValue(StartUpMode.FULL.toString)
+      .withDescription("The start up mode when read Fluss table.")
+
+  val LOG_SCANNER_POLL_TIMEOUT: ConfigOption[Duration] =
+    ConfigBuilder
+      .key("log.scanner.poll.timeout")

Review Comment:
   I prefer `scan.poll.timeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to