YannByron commented on code in PR #2532:
URL: https://github.com/apache/fluss/pull/2532#discussion_r2759273946
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala:
##########
@@ -36,6 +37,38 @@ trait FlussScan extends Scan {
}
}
+object FlussScan {
Review Comment:
Move these to `FlussBatch` or other common place for `MicroBatchStream`.
`OffsetsInitializer` affects how to plan (which is controlled in `FlussBatch`),
not how to generate `Batch` and `MicroBatchStream`. And keep the original
definition of `FlussXXXScan`.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala:
##########
@@ -17,23 +17,37 @@
package org.apache.fluss.spark
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
import org.apache.fluss.config.ConfigBuilder.key
-import org.apache.fluss.config.ConfigOption
-import org.apache.spark.sql.internal.SQLConf.buildConf
+import java.time.Duration
object SparkFlussConf {
- val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
- .internal()
- .doc("If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
- .booleanConf
- .createWithDefault(false)
+ val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."
val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
- key(READ_OPTIMIZED.key)
+ key("read.optimized")
.booleanType()
- .defaultValue(READ_OPTIMIZED.defaultValue.get)
- .withDescription(READ_OPTIMIZED.doc)
+ .defaultValue(false)
+ .withDescription(
+ "If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
+ object StartUpMode extends Enumeration {
+ val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+ }
+
+ val SCAN_START_UP_MODE: ConfigOption[String] =
Review Comment:
I suggest to place these common options used both spark and flink in
`ConfigOptions`, like
https://github.com/apache/paimon/blob/a10a44892cd5e9dbac705762ed6774674357692f/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java#L947
in paimon. We can do this in separate pr maybe.
##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala:
##########
@@ -38,6 +38,14 @@ class SparkPrimaryKeyTableReadTest extends
FlussSparkTestBase {
new Configuration()
}
+ override protected def beforeEach(): Unit = {
Review Comment:
All the configs are default values, this's meaningless, so I suggest to
remove this.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala:
##########
@@ -17,23 +17,37 @@
package org.apache.fluss.spark
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
import org.apache.fluss.config.ConfigBuilder.key
-import org.apache.fluss.config.ConfigOption
-import org.apache.spark.sql.internal.SQLConf.buildConf
+import java.time.Duration
object SparkFlussConf {
- val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
- .internal()
- .doc("If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
- .booleanConf
- .createWithDefault(false)
+ val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."
val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
- key(READ_OPTIMIZED.key)
+ key("read.optimized")
.booleanType()
- .defaultValue(READ_OPTIMIZED.defaultValue.get)
- .withDescription(READ_OPTIMIZED.doc)
+ .defaultValue(false)
+ .withDescription(
+ "If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
+ object StartUpMode extends Enumeration {
+ val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+ }
+
+ val SCAN_START_UP_MODE: ConfigOption[String] =
+ ConfigBuilder
+ .key("scan.startup.mode")
+ .stringType()
+ .defaultValue(StartUpMode.FULL.toString)
+ .withDescription("The start up mode when read Fluss table.")
+
+ val LOG_SCANNER_POLL_TIMEOUT: ConfigOption[Duration] =
+ ConfigBuilder
+ .key("log.scanner.poll.timeout")
Review Comment:
I prefer `scan.poll.timeout`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]