This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/34.0.0 by this push:
new b3f6a23eb60 MSQ: Fix composing channels losing partitionless frames.
(#18220) (#18267)
b3f6a23eb60 is described below
commit b3f6a23eb6015009e2bb763a739f24cabd9807f0
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Jul 16 13:53:58 2025 -0500
MSQ: Fix composing channels losing partitionless frames. (#18220) (#18267)
* MSQ: Fix composing channels losing partitionless frames.
The main change is that single-partition ComposingWritableFrameChannels
(i.e.
those created by `ChannelOutputFactory#openChannel`) now associate all
incoming
frames with that partition. Previously, frames might have come in with
partition set to `NO_PARTITION`, which would cause them to get "lost" by
the composing channel.
Fixes a bug introduced in #18144 when composed intermediate stage-internal
channels started being used for the output of hash partitioning. Prior to
#18144, they were only used for internal channels of the SuperSorter. This
bug could cause frames to go missing during sortMerge joins.
This patch also adds an embedded test for various durable storage scenarios,
including sortMerge join tests that would have caught the original bug.
Finally, this patch adjusts the way that Calcites escapes string literals,
to use the actual characters more often when possible. This helps format
the test SQLs generated by the embedded test more nicely.
* dependency analyze.
---------
Co-authored-by: Gian Merlino <[email protected]>
Co-authored-by: Karan Kumar <[email protected]>
---
embedded-tests/pom.xml | 10 +
.../msq/EmbeddedDurableShuffleStorageTest.java | 396 +++++++++++++++++++++
.../embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 5 +-
.../embedded/msq/MinIODurableStorageResource.java | 82 +++++
.../channel/ComposingWritableFrameChannel.java | 30 +-
.../processor/ComposingOutputChannelFactory.java | 11 +
.../channel/ComposingWritableFrameChannelTest.java | 1 +
.../frame/processor/OutputChannelFactoryTest.java | 21 ++
.../testing/embedded/EmbeddedDruidServer.java | 12 +-
.../testing/embedded/ServerReferenceHolder.java | 12 +
.../testing/embedded/ServerReferencesProvider.java | 7 +
.../apache/druid/sql/calcite/planner/Calcites.java | 2 +-
.../druid/sql/calcite/planner/CalcitesTest.java | 21 +-
13 files changed, 602 insertions(+), 8 deletions(-)
diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 13c8aeca155..1de13221c47 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -182,6 +182,16 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
new file mode 100644
index 00000000000..fd2b9bc535b
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartControllerModule;
+import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
+import org.apache.druid.msq.guice.MSQDurableStorageModule;
+import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.guice.MSQSqlModule;
+import org.apache.druid.msq.guice.SqlTaskModule;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.indexing.report.MSQStagesReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.testcontainers.shaded.com.google.common.io.ByteStreams;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Embedded test to batch-ingest wikipedia data from {@link
#loadWikipediaTable()}, then query that data
+ * with MSQ tasks using MinIO-based durable storage.
+ */
+public class EmbeddedDurableShuffleStorageTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedRouter router = new EmbeddedRouter();
+ private final MinIOStorageResource storageResource = new
MinIOStorageResource();
+ private final MinIODurableStorageResource msqStorageResource = new
MinIODurableStorageResource(storageResource);
+
+ private EmbeddedMSQApis msqApis;
+
+ @Override
+ public EmbeddedDruidCluster createCluster()
+ {
+ coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
+
+ overlord.addProperty("druid.manager.segments.useIncrementalCache",
"always")
+ .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+ indexer.setServerMemory(400_000_000)
+ .addProperty("druid.worker.capacity", "4")
+ .addProperty("druid.processing.numThreads", "3")
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addExtension(DartControllerModule.class)
+ .addExtension(DartWorkerModule.class)
+ .addExtension(DartControllerMemoryManagementModule.class)
+ .addExtension(DartWorkerMemoryManagementModule.class)
+ .addExtension(IndexerMemoryManagementModule.class)
+ .addExtension(MSQDurableStorageModule.class)
+ .addExtension(MSQIndexingModule.class)
+ .addExtension(MSQSqlModule.class)
+ .addExtension(SqlTaskModule.class)
+ .addExtension(MSQExternalDataSourceModule.class)
+ .addExtension(S3StorageConnectorModule.class)
+ .addResource(storageResource)
+ .addResource(msqStorageResource)
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(indexer)
+ .addServer(broker)
+ .addServer(historical)
+ .addServer(router);
+ }
+
+ @BeforeEach
+ void setUpEach()
+ {
+ msqApis = new EmbeddedMSQApis(cluster, overlord);
+ dataSource = EmbeddedClusterApis.createTestDatasourceName();
+ }
+
+ @Test
+ @Timeout(120)
+ public void test_selectFirstPage() throws IOException
+ {
+ loadWikipediaTable();
+
+ final String sql = StringUtils.format(
+ "SET durableShuffleStorage = TRUE;\n"
+ + "SET targetPartitionsPerWorker = 3;\n"
+ + "SELECT __time, page \n"
+ + "FROM \"%s\"\n"
+ + "ORDER BY page ASC\n"
+ + "LIMIT 1",
+ dataSource
+ );
+
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+ BaseCalciteQueryTest.assertResultsEquals(
+ sql,
+ Collections.singletonList(new
Object[]{DateTimes.of("2015-09-12T23:26:08.226Z").getMillis(), "!T.O.O.H.!"}),
+ payload.getResults().getResults()
+ );
+
+ assertStageOutputIsDurable(payload, false);
+ }
+
+ @Test
+ @Timeout(120)
+ public void test_selectCount() throws IOException
+ {
+ loadWikipediaTable();
+
+ final String sql = StringUtils.format(
+ "SET durableShuffleStorage = TRUE;\n"
+ + "SET targetPartitionsPerWorker = 3;\n"
+ + "SELECT COUNT(*) FROM \"%s\"",
+ dataSource
+ );
+
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+ BaseCalciteQueryTest.assertResultsEquals(
+ sql,
+ Collections.singletonList(new Object[]{39244}),
+ payload.getResults().getResults()
+ );
+
+ assertStageOutputIsDurable(payload, false);
+ }
+
+ @Test
+ @Timeout(120)
+ public void test_selectJoin_broadcast() throws IOException
+ {
+ loadWikipediaTable();
+
+ final String sql = StringUtils.format(
+ "SET durableShuffleStorage = TRUE;\n"
+ + "SET sqlJoinAlgorithm = 'broadcast';\n"
+ + "SET targetPartitionsPerWorker = 3;\n"
+ + "SELECT channel, COUNT(*)"
+ + "FROM \"%s\""
+ + "WHERE channel IN (SELECT channel FROM \"%s\" GROUP BY 1 ORDER BY
COUNT(*) DESC LIMIT 5)"
+ + "GROUP BY channel\n"
+ + "ORDER BY channel",
+ dataSource,
+ dataSource
+ );
+
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+ BaseCalciteQueryTest.assertResultsEquals(
+ sql,
+ List.of(
+ new Object[]{"#de.wikipedia", 2523},
+ new Object[]{"#en.wikipedia", 11549},
+ new Object[]{"#fr.wikipedia", 2099},
+ new Object[]{"#ru.wikipedia", 1386},
+ new Object[]{"#vi.wikipedia", 9747}
+ ),
+ payload.getResults().getResults()
+ );
+
+ assertStageOutputIsDurable(payload, false);
+ }
+
+ @Test
+ @Timeout(120)
+ public void test_selectJoin_sortMerge() throws IOException
+ {
+ loadWikipediaTable();
+
+ final String sql = StringUtils.format(
+ "SET durableShuffleStorage = TRUE;\n"
+ + "SET sqlJoinAlgorithm = 'sortMerge';\n"
+ + "SET targetPartitionsPerWorker = 3;\n"
+ + "SELECT channel, COUNT(*)"
+ + "FROM \"%s\""
+ + "WHERE channel IN (SELECT channel FROM \"%s\" GROUP BY 1 ORDER BY
COUNT(*) DESC LIMIT 5)"
+ + "GROUP BY channel\n"
+ + "ORDER BY channel",
+ dataSource,
+ dataSource
+ );
+
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+ BaseCalciteQueryTest.assertResultsEquals(
+ sql,
+ List.of(
+ new Object[]{"#de.wikipedia", 2523},
+ new Object[]{"#en.wikipedia", 11549},
+ new Object[]{"#fr.wikipedia", 2099},
+ new Object[]{"#ru.wikipedia", 1386},
+ new Object[]{"#vi.wikipedia", 9747}
+ ),
+ payload.getResults().getResults()
+ );
+
+ assertStageOutputIsDurable(payload, false);
+ }
+
+ @Test
+ @Timeout(120)
+ public void test_selectJoin_sortMerge_durableDestination() throws IOException
+ {
+ loadWikipediaTable();
+
+ final String sql = StringUtils.format(
+ "SET durableShuffleStorage = TRUE;\n"
+ + "SET sqlJoinAlgorithm = 'sortMerge';\n"
+ + "SET targetPartitionsPerWorker = 3;\n"
+ + "SET selectDestination = 'durableStorage';\n" // Write results to
durable storage (instead of task report)
+ + "SET maxNumTasks = 3;\n" // Force two worker tasks
+ + "SET rowsPerPage = 3;\n" // Force two result partitions (one per
task) -- results have 5 items
+ + "SELECT channel, COUNT(*)"
+ + "FROM \"%s\""
+ + "WHERE channel IN (SELECT channel FROM \"%s\" GROUP BY 1 ORDER BY
COUNT(*) DESC LIMIT 5)"
+ + "GROUP BY channel\n"
+ + "ORDER BY channel",
+ dataSource,
+ dataSource
+ );
+
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+ BaseCalciteQueryTest.assertResultsEquals(
+ sql,
+ List.of(
+ new Object[]{"#de.wikipedia", 2523},
+ new Object[]{"#en.wikipedia", 11549},
+ new Object[]{"#fr.wikipedia", 2099},
+ new Object[]{"#ru.wikipedia", 1386},
+ new Object[]{"#vi.wikipedia", 9747}
+ ),
+ payload.getResults().getResults()
+ );
+
+ assertStageOutputIsDurable(payload, true);
+
+ // Check that query results actually got written to durable storage.
+ final List<MSQStagesReport.Stage> stages = payload.getStages().getStages();
+ final String queryId =
stages.get(0).getStageDefinition().getId().getQueryId();
+ final String resultsBaseKey = StringUtils.format(
+ "%s/query-results/controller_%s",
+ msqStorageResource.getBaseKey(),
+ queryId
+ );
+
+ final List<S3ObjectSummary> queryResultObjects =
+ storageResource.getS3Client()
+ .listObjects(storageResource.getBucket(),
resultsBaseKey)
+ .getObjectSummaries();
+
+ final int lastStage = stages.size() - 1;
+ Assertions.assertEquals(
+ Set.of(
+ StringUtils.format("%s/stage_%s/worker_0/__success",
resultsBaseKey, lastStage),
+ StringUtils.format("%s/stage_%s/worker_1/__success",
resultsBaseKey, lastStage),
+
StringUtils.format("%s/stage_%s/worker_0/taskId_%s-worker0_0/part_0",
resultsBaseKey, lastStage, queryId),
+
StringUtils.format("%s/stage_%s/worker_1/taskId_%s-worker1_0/part_1",
resultsBaseKey, lastStage, queryId)
+ ),
+
queryResultObjects.stream().map(S3ObjectSummary::getKey).collect(Collectors.toSet())
+ );
+ }
+
+ /**
+ * Insert wikipedia dataset into the table {@link #dataSource}.
+ */
+ private void loadWikipediaTable() throws IOException
+ {
+ final File tmpDir = cluster.getTestFolder().newFolder();
+ final File wikiFile = new File(tmpDir, "wiki.gz");
+
+ ByteStreams.copy(
+
DruidProcessingConfigTest.class.getResourceAsStream("/wikipedia/wikiticker-2015-09-12-sampled.json.gz"),
+ Files.newOutputStream(wikiFile.toPath())
+ );
+
+ final String sql = StringUtils.format(
+ "SET durableShuffleStorage = TRUE;"
+ + "SET waitUntilSegmentsLoad = TRUE;\n"
+ + "REPLACE INTO \"%s\" OVERWRITE ALL\n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"time\") AS __time,\n"
+ + " channel,\n"
+ + " countryName,\n"
+ + " page,\n"
+ + " \"user\",\n"
+ + " added,\n"
+ + " deleted,\n"
+ + " delta\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " %s,\n"
+ + " '{\"type\":\"json\"}',\n"
+ + "
'[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"n
[...]
+ + " )\n"
+ + " )\n"
+ + "PARTITIONED BY DAY\n"
+ + "CLUSTERED BY channel",
+ dataSource,
+ Calcites.escapeStringLiteral(
+ broker.bindings()
+ .jsonMapper()
+ .writeValueAsString(new LocalInputSource(null, null,
Collections.singletonList(wikiFile), null))
+ )
+ );
+
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+ Assertions.assertEquals(TaskState.SUCCESS,
payload.getStatus().getStatus());
+ Assertions.assertEquals(1,
payload.getStatus().getSegmentLoadWaiterStatus().getTotalSegments());
+ Assertions.assertNull(payload.getStatus().getErrorReport());
+ }
+
+ /**
+ * Asserts that all stages report output channels of type {@link
OutputChannelMode#DURABLE_STORAGE_INTERMEDIATE},
+ * or possibly {@link OutputChannelMode#DURABLE_STORAGE_QUERY_RESULTS} for
the final stage if {@code durableResults}.
+ * the destination is {@link MSQSelectDestination#DURABLESTORAGE}.
+ */
+ private static void assertStageOutputIsDurable(
+ final MSQTaskReportPayload reportPayload,
+ final boolean durableResults
+ )
+ {
+ final List<MSQStagesReport.Stage> stages =
reportPayload.getStages().getStages();
+ MatcherAssert.assertThat(stages.size(), Matchers.greaterThanOrEqualTo(1));
+
+ for (final MSQStagesReport.Stage stage : stages) {
+ final OutputChannelMode expectedOutputChannelMode;
+ if (stage.getStageNumber() == stages.size() - 1 && durableResults) {
+ expectedOutputChannelMode =
OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS;
+ } else {
+ expectedOutputChannelMode =
OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE;
+ }
+
+ Assertions.assertEquals(
+ expectedOutputChannelMode,
+ stage.getOutputChannelMode(),
+ StringUtils.format("OutputChannelMode for stage[%s]",
stage.getStageNumber())
+ );
+ }
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
index 3962f81202d..c2e770b4490 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -24,7 +24,6 @@ import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.emitter.kafka.KafkaEmitter;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
@@ -77,8 +76,8 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
- * Embedded test to emit cluster metrics using a {@link KafkaEmitter} and then
- * ingest them back into the cluster with a {@code KafkaSupervisor}.
+ * Embedded test to ingest {@link TestIndex#getMMappedWikipediaIndex()} into
Kafka tasks, then query
+ * those tasks with MSQ.
*/
public class EmbeddedMSQRealtimeQueryTest extends EmbeddedClusterTestBase
{
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MinIODurableStorageResource.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MinIODurableStorageResource.java
new file mode 100644
index 00000000000..8913e3e3ea4
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MinIODurableStorageResource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedResource;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+
+import java.io.File;
+
+/**
+ * Resource that configures MSQ to use a bucket from {@link
MinIOStorageResource} for durable intermediate data storage.
+ */
+public class MinIODurableStorageResource implements EmbeddedResource
+{
+ private final MinIOStorageResource storageResource;
+
+ public MinIODurableStorageResource(MinIOStorageResource storageResource)
+ {
+ this.storageResource = storageResource;
+ }
+
+ @Override
+ public void start()
+ {
+ // Nothing to do.
+ }
+
+ @Override
+ public void stop()
+ {
+ // Nothing to do.
+ }
+
+ @Override
+ public void onStarted(EmbeddedDruidCluster cluster)
+ {
+ final File intermediateTempDir =
cluster.getTestFolder().getOrCreateFolder("msq-shuffle-storage-tmp");
+ cluster.addCommonProperty("druid.msq.intermediate.storage.enable", "true");
+ cluster.addCommonProperty("druid.msq.intermediate.storage.type", "s3");
+ cluster.addCommonProperty("druid.msq.intermediate.storage.tempDir",
intermediateTempDir.getAbsolutePath());
+ cluster.addCommonProperty("druid.msq.intermediate.storage.bucket",
storageResource.getBucket());
+ cluster.addCommonProperty("druid.msq.intermediate.storage.prefix",
getBaseKey());
+
+ // Set tmpStorageBytesPerTask to 3 GB. This controls when stage-internal
channels (such as ones used internally
+ // by SuperSorter) "spill over" from local disk to durable storage. Cannot
set this much lower currently, due to
+ // validations in WorkerStorageParameters. Ideally, we'd like to set this
low enough such that embedded tests are
+ // actually using minio/S3 for stage-internal storage. At this level, they
won't be using it, but at least stages
+ // will go through the motions of setting up composing channels
internally. There is still value in exercising
+ // that code path.
+ //
+ // Note: this below property only controls the behavior of stage-internal
channels. With the above intermediate
+ // storage configs, stage *output* will *always* go to durable storage.
+ cluster.addCommonProperty("druid.indexer.task.tmpStorageBytesPerTask",
"3000000000");
+ }
+
+ /**
+ * Returns the value of {@code druid.msq.intermediate.storage.prefix}.
+ */
+ public String getBaseKey()
+ {
+ return StringUtils.format("%s/%s", storageResource.getBaseKey(),
"msq-intermediate");
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
index ed0f56c408a..03a2cf1a8a0 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
@@ -22,6 +22,7 @@ package org.apache.druid.frame.channel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.PartitionedOutputChannel;
import org.apache.druid.java.util.common.IAE;
@@ -41,6 +42,9 @@ import java.util.function.Supplier;
*/
public class ComposingWritableFrameChannel implements WritableFrameChannel
{
+ @Nullable
+ private final Integer partitionNumber;
+
@Nullable
private final List<Supplier<OutputChannel>> outputChannelSuppliers;
@@ -51,7 +55,19 @@ public class ComposingWritableFrameChannel implements
WritableFrameChannel
private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
private int currentIndex;
+ /**
+ * Create a new channel.
+ *
+ * @param partitionNumber partition number for a
single-partition channel, or null for a
+ * multi-partition channel
+ * @param outputChannelSuppliers one supplier per composed
channel, if this is a single-partition channel
+ * @param partitionedOutputChannelSuppliers one supplier per composed
channel, if this is a multi-partition channel
+ * @param writableChannelSuppliers one supplier per composed channel
+ * @param partitionToChannelMap empty map that will be populated
with the set of valid channels
+ * for each partition
+ */
public ComposingWritableFrameChannel(
+ @Nullable Integer partitionNumber,
@Nullable List<Supplier<OutputChannel>> outputChannelSuppliers,
@Nullable List<Supplier<PartitionedOutputChannel>>
partitionedOutputChannelSuppliers,
List<Supplier<WritableFrameChannel>> writableChannelSuppliers,
@@ -61,6 +77,7 @@ public class ComposingWritableFrameChannel implements
WritableFrameChannel
if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers !=
null) {
throw new IAE("Atmost one of outputChannelSuppliers and
partitionedOutputChannelSuppliers can be provided");
}
+ this.partitionNumber = partitionNumber;
this.outputChannelSuppliers = outputChannelSuppliers;
this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers;
this.writableChannelSuppliers =
Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers
is null");
@@ -76,9 +93,20 @@ public class ComposingWritableFrameChannel implements
WritableFrameChannel
throw new ISE("No more channels available to write. Total available
channels : " + writableChannelSuppliers.size());
}
+ if (partitionNumber != null
+ && frameWithPartition.partition() != FrameWithPartition.NO_PARTITION
+ && frameWithPartition.partition() != partitionNumber) {
+ throw DruidException.defensive(
+ "Invalid partition number[%d], expected[%d]",
+ frameWithPartition.partition(),
+ partitionNumber
+ );
+ }
+
try {
+ final int writtenPartition = partitionNumber != null ? partitionNumber :
frameWithPartition.partition();
writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition);
- partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k
-> Sets.newHashSetWithExpectedSize(1))
+ partitionToChannelMap.computeIfAbsent(writtenPartition, k ->
Sets.newHashSetWithExpectedSize(1))
.add(currentIndex);
}
catch (ResourceLimitExceededException rlee) {
diff --git
a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
index 28aa07152fb..f11ea6430bf 100644
---
a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
+++
b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
@@ -83,6 +83,7 @@ public class ComposingOutputChannelFactory implements
OutputChannelFactory
// it is useful to identify the readable channels to open in the
composition while reading the partition data.
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
ComposingWritableFrameChannel writableFrameChannel = new
ComposingWritableFrameChannel(
+ partitionNumber,
outputChannelSupplierBuilder.build(),
null,
writableFrameChannelSuppliersBuilder.build(),
@@ -131,6 +132,7 @@ public class ComposingOutputChannelFactory implements
OutputChannelFactory
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
ComposingWritableFrameChannel writableFrameChannel = new
ComposingWritableFrameChannel(
+ null,
null,
partitionedOutputChannelSupplierBuilder.build(),
writableFrameChannelsBuilder.build(),
@@ -204,4 +206,13 @@ public class ComposingOutputChannelFactory implements
OutputChannelFactory
return false;
}
+
+ @Override
+ public String toString()
+ {
+ return "ComposingOutputChannelFactory{" +
+ "channelFactories=" + channelFactories +
+ ", frameSize=" + frameSize +
+ '}';
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
index eee8ce5e623..9c59cdc8a33 100644
---
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -71,6 +71,7 @@ public class ComposingWritableFrameChannelTest
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
ComposingWritableFrameChannel composingWritableFrameChannel = new
ComposingWritableFrameChannel(
+ null,
ImmutableList.of(
() -> outputChannel1,
() -> outputChannel2
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
index b8ea33893ea..c28e98d1ad4 100644
---
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
@@ -82,6 +82,27 @@ public abstract class OutputChannelFactoryTest extends
InitializedNullHandlingTe
Assert.assertEquals(frameSize,
channel.getFrameMemoryAllocator().capacity());
}
+ @Test
+ public void test_openChannel_noPartition() throws IOException,
ExecutionException, InterruptedException
+ {
+ OutputChannel channel = outputChannelFactory.openChannel(1);
+
+ Assert.assertEquals(1, channel.getPartitionNumber());
+
+ // write data to the channel
+ WritableFrameChannel writableFrameChannel = channel.getWritableChannel();
+ writableFrameChannel.writabilityFuture().get();
+ writableFrameChannel.write(frame);
+ writableFrameChannel.close();
+
+ // read back data from the channel
+ verifySingleFrameReadableChannel(
+ channel.getReadableChannel(),
+ sourceCursorFactory
+ );
+ Assert.assertEquals(frameSize,
channel.getFrameMemoryAllocator().capacity());
+ }
+
@Test
public void test_openPartitionedChannel() throws IOException,
ExecutionException, InterruptedException
{
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
index 374b5420113..c33247df2bf 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
@@ -113,7 +113,7 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
/**
* Sets the amount of heap memory visible to the server through {@link
RuntimeInfo}.
*/
- public final EmbeddedDruidServer setServerMemory(long serverMemory)
+ public final EmbeddedDruidServer<T> setServerMemory(long serverMemory)
{
this.serverMemory = serverMemory;
return this;
@@ -122,7 +122,7 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
/**
* Sets the amount of direct (off-heap) memory visible to the server through
{@link RuntimeInfo}.
*/
- public final EmbeddedDruidServer setServerDirectMemory(long
serverDirectMemory)
+ public final EmbeddedDruidServer<T> setServerDirectMemory(long
serverDirectMemory)
{
this.serverDirectMemory = serverDirectMemory;
return this;
@@ -232,6 +232,14 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
return referenceHolder.latchableEmitter();
}
+ @Override
+ public String toString()
+ {
+ return "EmbeddedDruidServer{" +
+ "name='" + name + '\'' +
+ '}';
+ }
+
/**
* Handler used to register the lifecycle of an embedded server.
*/
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
index 2c66ae845cc..d36d85693b1 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
@@ -19,6 +19,7 @@
package org.apache.druid.testing.embedded;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.client.coordinator.Coordinator;
@@ -27,6 +28,7 @@ import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.http.client.HttpClient;
@@ -81,6 +83,10 @@ public final class ServerReferenceHolder implements
ServerReferencesProvider
@Inject
private DruidNode selfNode;
+ @Inject
+ @Json
+ private ObjectMapper jsonMapper;
+
@Override
public DruidNode selfNode()
{
@@ -146,4 +152,10 @@ public final class ServerReferenceHolder implements
ServerReferencesProvider
{
return httpClient;
}
+
+ @Override
+ public ObjectMapper jsonMapper()
+ {
+ return jsonMapper;
+ }
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
index 671fb6aa305..6eeed490e1e 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
@@ -19,10 +19,12 @@
package org.apache.druid.testing.embedded;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.SQLMetadataConnector;
@@ -92,4 +94,9 @@ public interface ServerReferencesProvider
* {@link HttpClient} used by this server to communicate with other Druid
servers.
*/
HttpClient escalatedHttpClient();
+
+ /**
+ * {@link ObjectMapper} annotated with {@link Json}.
+ */
+ ObjectMapper jsonMapper();
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
index 7f90f0a9aa2..0c1a82f5170 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
@@ -126,7 +126,7 @@ public class Calcites
final StringBuilder builder = new StringBuilder("'");
for (int i = 0; i < s.length(); i++) {
final char c = s.charAt(i);
- if (Character.isLetterOrDigit(c) || c == ' ') {
+ if (Character.isLetterOrDigit(c) || (c >= 32 && c < 127 && c != '\'' &&
c != '\\')) {
builder.append(c);
if (c > 127) {
isPlainAscii = false;
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
index 0b82ee4f786..8c5f57e81d6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
@@ -40,12 +40,31 @@ public class CalcitesTest extends CalciteTestBase
assertEquals("'foo'", Calcites.escapeStringLiteral("foo"));
assertEquals("'foo bar'", Calcites.escapeStringLiteral("foo bar"));
assertEquals("U&'foö bar'", Calcites.escapeStringLiteral("foö bar"));
- assertEquals("U&'foo \\0026\\0026 bar'", Calcites.escapeStringLiteral("foo
&& bar"));
+ assertEquals("'foo && bar'", Calcites.escapeStringLiteral("foo && bar"));
assertEquals("U&'foo \\005C bar'", Calcites.escapeStringLiteral("foo \\
bar"));
assertEquals("U&'foo\\0027s bar'", Calcites.escapeStringLiteral("foo's
bar"));
assertEquals("U&'друид'", Calcites.escapeStringLiteral("друид"));
}
+ @Test
+ public void testEscapeStringLiteralAllAscii()
+ {
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i <= 127; i++) {
+ sb.append((char) i);
+ }
+ final String allAscii = sb.toString();
+ final String result = Calcites.escapeStringLiteral(allAscii);
+
+ // Verify the result is properly escaped and contains all ASCII characters
+ assertEquals(
+
"U&'\\0000\\0001\\0002\\0003\\0004\\0005\\0006\\0007\\0008\\0009\\000A\\000B\\000C\\000D\\000E\\000F\\0010"
+ +
"\\0011\\0012\\0013\\0014\\0015\\0016\\0017\\0018\\0019\\001A\\001B\\001C\\001D\\001E\\001F
!\"#$%&\\0027()"
+ +
"*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\005C]^_`abcdefghijklmnopqrstuvwxyz{|}~\\007F'",
+ result
+ );
+ }
+
@Test
public void testFindUnusedPrefix()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]