github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3239029372
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -135,22 +152,26 @@ public String getSourceType() {
@Override
public Offset getNextOffset(StreamingJobProperties jobProps, Map<String,
String> properties) {
- JdbcOffset nextOffset = new JdbcOffset();
- if (!remainingSplits.isEmpty()) {
- int splitsNum = Math.min(remainingSplits.size(),
snapshotParallelism);
- List<SnapshotSplit> snapshotSplits = new
ArrayList<>(remainingSplits.subList(0, splitsNum));
- nextOffset.setSplits(snapshotSplits);
- return nextOffset;
- } else if (currentOffset != null && currentOffset.snapshotSplit()) {
- // initial mode: snapshot to binlog
- // snapshot-only mode must be intercepted by hasReachedEnd()
before reaching here
- BinlogSplit binlogSplit = new BinlogSplit();
- binlogSplit.setFinishedSplits(finishedSplits);
- nextOffset.setSplits(Collections.singletonList(binlogSplit));
- return nextOffset;
- } else {
- // only binlog
- return currentOffset == null ? new
JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset;
+ synchronized (splitsLock) {
+ JdbcOffset nextOffset = new JdbcOffset();
+ if (!remainingSplits.isEmpty()) {
+ int splitsNum = Math.min(remainingSplits.size(),
snapshotParallelism);
+ List<SnapshotSplit> snapshotSplits = new
ArrayList<>(remainingSplits.subList(0, splitsNum));
+ nextOffset.setSplits(snapshotSplits);
+ return nextOffset;
+ } else if (currentOffset != null && currentOffset.snapshotSplit()
&& noMoreSplits()) {
+ // initial mode: snapshot to binlog. noMoreSplits() guards
against switching while
+ // splitting is still in progress (remainingSplits empty
doesn't mean fully cut).
+ // snapshot-only mode is intercepted by hasReachedEnd() before
reaching here.
+ BinlogSplit binlogSplit = new BinlogSplit();
+ binlogSplit.setFinishedSplits(new ArrayList<>(finishedSplits));
+ nextOffset.setSplits(Collections.singletonList(binlogSplit));
+ return nextOffset;
+ } else {
+ // only binlog
+ return currentOffset == null
+ ? new JdbcOffset(Collections.singletonList(new
BinlogSplit())) : currentOffset;
+ }
Review Comment:
When `remainingSplits` is empty but `currentOffset` is still a snapshot
offset and `noMoreSplits()` is false, this fallback returns the just-committed
snapshot offset again. `onStreamTaskSuccess()` creates the next task
immediately, before the scheduler necessarily runs `advanceSplitsIfNeed()` to
publish the next batch; that task will re-read the same snapshot splits and
duplicate data. Please make this state defer task creation instead of reusing
`currentOffset` (for example, have the success path check
`hasMoreDataToConsume()`/advance splits before creating the next task, or make
`getNextOffset()` return a non-consumable/no-op state until `remainingSplits`
is populated).
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_uneven.groovy:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import java.util.concurrent.CopyOnWriteArraySet
+import java.util.concurrent.atomic.AtomicBoolean
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Exercises the uneven splitter path (VARCHAR PK bypasses
isEvenlySplitColumn() and
+// goes through splitOneUnevenlySizedChunk -> per-chunk MAX(...) probe SQL)
AND verifies
+// that splitting actually happens in batches: with batch_size shrunk to 5, the
+// streaming_job_meta.chunk_list must grow incrementally (multiple distinct
lengths
+// observed during snapshot) — a single-shot synchronous splitter would write
the
+// full list in one UPSERT and we'd only ever see one length.
+suite("test_streaming_postgres_job_async_split_uneven",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_async_split_uneven_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_async_split_uneven"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def totalRows = 500
+ int expectedChunks = (int) Math.ceil(totalRows / 5.0) // 100 chunks at
split_size=5
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // Shrink the FE-side batch size so 100 chunks need ~20 RPCs instead
of 1 — needed
+ // to make the "in batches" assertion observable. Default 100 would
otherwise let
+ // cdc_client return all 100 chunks in a single RPC even on the uneven
path.
+ def origBatchSize = sql("""ADMIN SHOW FRONTEND CONFIG LIKE
'streaming_cdc_fetch_splits_batch_size'""")
+ .get(0).get(1)
+ sql """ADMIN SET FRONTEND CONFIG
('streaming_cdc_fetch_splits_batch_size' = '5')"""
+
+ try {
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "id" varchar(20) PRIMARY KEY,
+ "name" varchar(200),
+ "age" int4
+ )"""
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${pgDB}.${pgSchema}.${table1} (id,
name, age) VALUES ")
+ for (int i = 1; i <= totalRows; i++) {
+ if (i > 1) sb.append(", ")
+ String key = "k_" + String.format("%05d", i)
+ sb.append("('${key}', 'name_${i}', ${i})")
+ }
+ sql sb.toString()
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "5",
+ "snapshot_parallelism" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ def jobIdRow = sql """select Id from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobIdRow.size() == 1
+ def jobId = jobIdRow.get(0).get(0).toString()
+
+ // Background sampler: poll streaming_job_meta.chunk_list length
every 150ms while
+ // the splitter runs. With batch_size=5 and ~20 RPCs the
chunk_list grows in steps,
+ // so we should observe several distinct lengths.
+ def lengthsSeen = new CopyOnWriteArraySet<Integer>()
+ def samplerStop = new AtomicBoolean(false)
+ def sampler = Thread.start {
+ while (!samplerStop.get()) {
+ try {
+ def r = sql """SELECT json_length(chunk_list)
+ FROM
internal.__internal_schema.streaming_job_meta
+ WHERE job_id='${jobId}'"""
+ if (r.size() > 0 && r.get(0).get(0) != null) {
+ lengthsSeen.add(r.get(0).get(0) as int)
+ }
+ } catch (Throwable ignored) { /* table may not exist yet;
retry */ }
+ try { Thread.sleep(150) } catch (InterruptedException ie)
{ break }
+ }
+ }
+
+ try {
+ Awaitility.await().atMost(600, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """SELECT COUNT(*) FROM
${currentDb}.${table1}"""
+ log.info("doris row count: ${cnt}")
+ cnt.size() == 1 && cnt.get(0).get(0) == totalRows
+ }
+ )
+ } catch (Exception ex) {
+ samplerStop.set(true)
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+ samplerStop.set(true)
+ sampler.join(5000)
+ log.info("chunk_list lengths observed during snapshot:
${lengthsSeen.toSorted()}")
+
+ // Must have reached the full chunk count by the end.
+ assert lengthsSeen.contains(expectedChunks) :
+ "chunk_list never reached the full ${expectedChunks}
chunks, observed: ${lengthsSeen.toSorted()}"
Review Comment:
Same sampler race as the MySQL uneven test: the row-count await can complete
before this background sampler has observed the final `chunk_list` value, so
the assertion can fail even when async splitting is correct. Please add a
synchronous final `json_length(chunk_list)` check or await `expectedChunks`
explicitly before stopping the sampler.
##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_async_split.groovy:
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import java.util.concurrent.CopyOnWriteArraySet
+import java.util.concurrent.atomic.AtomicBoolean
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// cdc_stream TVF path + uneven splitter (VARCHAR PK). With batch_size shrunk
so
+// the splitter is forced to multiple RPCs, streaming_job_meta.chunk_list must
grow
+// in steps — one-shot splitting would land the full list in a single UPSERT.
+suite("test_streaming_job_cdc_stream_postgres_async_split",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_postgres_async_split"
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_postgres_async_split_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_postgres_async_split_src"
+ def totalRows = 500
+ int expectedChunks = (int) Math.ceil(totalRows / 5.0)
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `id` varchar(20) NULL,
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // Shrink batch_size so the splitter must make multiple RPCs —
otherwise the
+ // default 100 lets 100 chunks fit in one shot and the "in batches"
assertion
+ // becomes vacuous on the uneven path too.
+ def origBatchSize = sql("""ADMIN SHOW FRONTEND CONFIG LIKE
'streaming_cdc_fetch_splits_batch_size'""")
+ .get(0).get(1)
+ sql """ADMIN SET FRONTEND CONFIG
('streaming_cdc_fetch_splits_batch_size' = '5')"""
+
+ try {
+ // VARCHAR PK bypasses isEvenlySplitColumn() -> uneven splitter
path.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "id" varchar(20) PRIMARY KEY,
+ "name" varchar(200),
+ "age" int4
+ )"""
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (id,
name, age) VALUES ")
+ for (int i = 1; i <= totalRows; i++) {
+ if (i > 1) sb.append(", ")
+ String key = "k_" + String.format("%05d", i)
+ sb.append("('${key}', 'name_${i}', ${i})")
+ }
+ sql sb.toString()
+ }
+
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id,
name, age)
+ SELECT id, name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "initial",
+ "snapshot_split_size" = "5",
+ "snapshot_parallelism" = "2"
+ )
+ """
+
+ def jobIdRow = sql """select Id from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobIdRow.size() == 1
+ def jobId = jobIdRow.get(0).get(0).toString()
+
+ def lengthsSeen = new CopyOnWriteArraySet<Integer>()
+ def samplerStop = new AtomicBoolean(false)
+ def sampler = Thread.start {
+ while (!samplerStop.get()) {
+ try {
+ def r = sql """SELECT json_length(chunk_list)
+ FROM
internal.__internal_schema.streaming_job_meta
+ WHERE job_id='${jobId}'"""
+ if (r.size() > 0 && r.get(0).get(0) != null) {
+ lengthsSeen.add(r.get(0).get(0) as int)
+ }
+ } catch (Throwable ignored) { /* meta table may not have
row yet */ }
+ try { Thread.sleep(150) } catch (InterruptedException ie)
{ break }
+ }
+ }
+
+ try {
+ Awaitility.await().atMost(600, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """SELECT COUNT(*) FROM
${currentDb}.${dorisTable}"""
+ log.info("doris row count: ${cnt}")
+ cnt.size() == 1 && cnt.get(0).get(0) == totalRows
+ }
+ )
+ } catch (Exception ex) {
+ samplerStop.set(true)
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+ samplerStop.set(true)
+ sampler.join(5000)
+ log.info("chunk_list lengths observed during snapshot:
${lengthsSeen.toSorted()}")
+
+ assert lengthsSeen.contains(expectedChunks) :
+ "chunk_list never reached the full ${expectedChunks}
chunks, observed: ${lengthsSeen.toSorted()}"
Review Comment:
This has the same timing race: `totalRows` may be visible in Doris before
the sampler records the final `chunk_list` length, and the sampler is stopped
immediately afterward. Please verify or await the final
`json_length(chunk_list)` synchronously before asserting
`lengthsSeen.contains(expectedChunks)`.
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_async_split_uneven.groovy:
##########
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import java.util.concurrent.CopyOnWriteArraySet
+import java.util.concurrent.atomic.AtomicBoolean
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// MySQL uneven splitter path — VARCHAR PK forces splitOneUnevenlySizedChunk
on the
+// MySQL side (separate impl from PG: MySqlSourceReader.resolveSplitKeyClass
lives
+// in cdc_client). batch_size shrunk so chunk_list growth is observably
stepped.
+suite("test_streaming_mysql_job_async_split_uneven",
+ "p0,external,mysql,external_docker,external_docker_mysql,nondatalake")
{
+ def jobName = "test_streaming_mysql_job_async_split_uneven_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_mysql_async_split_uneven"
+ def mysqlDb = "test_cdc_db"
+ def totalRows = 500
+ int expectedChunks = (int) Math.ceil(totalRows / 5.0)
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ def origBatchSize = sql("""ADMIN SHOW FRONTEND CONFIG LIKE
'streaming_cdc_fetch_splits_batch_size'""")
+ .get(0).get(1)
+ sql """ADMIN SET FRONTEND CONFIG
('streaming_cdc_fetch_splits_batch_size' = '5')"""
+
+ try {
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `id` varchar(20) NOT NULL,
+ `name` varchar(200),
+ `age` int,
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${mysqlDb}.${table1} (id, name, age)
VALUES ")
+ for (int i = 1; i <= totalRows; i++) {
+ if (i > 1) sb.append(", ")
+ String key = "k_" + String.format("%05d", i)
+ sb.append("('${key}', 'name_${i}', ${i})")
+ }
+ sql sb.toString()
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "5",
+ "snapshot_parallelism" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ def jobIdRow = sql """select Id from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobIdRow.size() == 1
+ def jobId = jobIdRow.get(0).get(0).toString()
+
+ def lengthsSeen = new CopyOnWriteArraySet<Integer>()
+ def samplerStop = new AtomicBoolean(false)
+ def sampler = Thread.start {
+ while (!samplerStop.get()) {
+ try {
+ def r = sql """SELECT json_length(chunk_list)
+ FROM
internal.__internal_schema.streaming_job_meta
+ WHERE job_id='${jobId}'"""
+ if (r.size() > 0 && r.get(0).get(0) != null) {
+ lengthsSeen.add(r.get(0).get(0) as int)
+ }
+ } catch (Throwable ignored) { /* meta row not visible yet
*/ }
+ try { Thread.sleep(150) } catch (InterruptedException ie)
{ break }
+ }
+ }
+
+ try {
+ Awaitility.await().atMost(600, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """SELECT COUNT(*) FROM
${currentDb}.${table1}"""
+ log.info("doris row count: ${cnt}")
+ cnt.size() == 1 && cnt.get(0).get(0) == totalRows
+ }
+ )
+ } catch (Exception ex) {
+ samplerStop.set(true)
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+ samplerStop.set(true)
+ sampler.join(5000)
+ log.info("chunk_list lengths observed during snapshot:
${lengthsSeen.toSorted()}")
+
+ assert lengthsSeen.contains(expectedChunks) :
+ "chunk_list never reached the full ${expectedChunks}
chunks, observed: ${lengthsSeen.toSorted()}"
Review Comment:
This assertion races with the background sampler: reaching `totalRows` only
proves the snapshot data loaded, not that the sampler's last 150ms poll
observed the final `chunk_list` length before `samplerStop` is set. A correct
run can therefore fail here. Please synchronously query
`json_length(chunk_list)` after snapshot completion, or await it reaches
`expectedChunks`, before stopping/asserting the sampler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]