This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f546cd64a9 MSQ: Ensure that the allocated segment aligns with the
requested granularity (#14475)
f546cd64a9 is described below
commit f546cd64a9640ed14c9c0f44481678d39530e3b5
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jun 27 09:25:32 2023 +0530
MSQ: Ensure that the allocated segment aligns with the requested
granularity (#14475)
Changes:
- Throw an `InsertCannotAllocateSegmentFault` if the allocated segment is
not aligned with
the requested granularity.
- Tests to verify new behaviour
---
docs/multi-stage-query/reference.md | 2 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 17 +++-
.../error/InsertCannotAllocateSegmentFault.java | 70 ++++++++++++++-
.../org/apache/druid/msq/util/IntervalUtils.java | 22 ++++-
.../org/apache/druid/msq/exec/MSQFaultsTest.java | 54 +++++++++++-
.../msq/indexing/error/MSQFaultSerdeTest.java | 7 +-
.../apache/druid/msq/util/IntervalUtilsTest.java | 99 ++++++++++++++++++++++
.../java/util/common/granularity/Granularity.java | 3 +-
8 files changed, 262 insertions(+), 12 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index d56d86964f..ec6a5b1543 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -423,7 +423,7 @@ The following table describes error codes you may encounter
in the `multiStageQu
| <a name="error_CannotParseExternalData">`CannotParseExternalData`</a> | A
worker task could not parse data from an external datasource. | `errorMessage`:
More details on why parsing failed. |
| <a name="error_ColumnNameRestricted">`ColumnNameRestricted`</a> | The query
uses a restricted column name. | `columnName`: The restricted column name. |
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The
column type is not supported. This can be because:<br /> <br /><ul><li>Support
for writing or reading from a particular column type is not
supported.</li><li>The query attempted to use a column type that is not
supported by the frame format. This occurs with ARRAY types, which are not yet
implemented for frames.</li></ul> | `columnName`: The column name with an
unsupported type.<br /> <br />`columnType`: The unkn [...]
-| <a
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> |
The controller task could not allocate a new segment ID due to conflict with
existing segments or pending segments. Common reasons for such conflicts:<br />
<br /><ul><li>Attempting to mix different granularities in the same intervals
of the same datasource.</li><li>Prior ingestions that used non-extendable shard
specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the
attempted new segme [...]
+| <a
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> |
The controller task could not allocate a new segment ID due to conflict with
existing segments or pending segments. Common reasons for such conflicts:<br />
<br /><ul><li>Attempting to mix different granularities in the same intervals
of the same datasource.</li><li>Prior ingestions that used non-extendable shard
specs.</li></ul> <br /> <br /> Use REPLACE to overwrite the existing data or if
the error conta [...]
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or
REPLACE query did not generate any output rows in a situation where output rows
are required for success. This can happen for INSERT or REPLACE queries with
`PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. |
`dataSource` |
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or
REPLACE query was canceled by a higher-priority ingestion job, such as a
real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE
query encountered a null timestamp in the `__time` field.<br /><br />This can
happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a
timestamp that cannot be parsed.
([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null
when it cannot parse a timestamp.) In this case, try parsing your timestamps
using a different function or pattern. Or, if your timestamps may g [...]
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index dd163d406d..db5e699717 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -940,7 +940,22 @@ public class ControllerImpl implements Controller
throw new MSQException(
new InsertCannotAllocateSegmentFault(
task.getDataSource(),
- segmentGranularity.bucket(timestamp)
+ segmentGranularity.bucket(timestamp),
+ null
+ )
+ );
+ }
+
+ // Even if allocation isn't null, the overlord makes the best effort job
of allocating a segment with the given
+ // segmentGranularity. This is commonly seen in case when there is
already a coarser segment in the interval where
+ // the requested segment is present and that segment completely overlaps
the request. Throw an error if the interval
+ // doesn't match the granularity requested
+ if (!IntervalUtils.isAligned(allocation.getInterval(),
segmentGranularity)) {
+ throw new MSQException(
+ new InsertCannotAllocateSegmentFault(
+ task.getDataSource(),
+ segmentGranularity.bucket(timestamp),
+ allocation.getInterval()
)
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
index 403af37d9b..f632ae6773 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
@@ -23,8 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.DurationGranularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.Objects;
@JsonTypeName(InsertCannotAllocateSegmentFault.CODE)
@@ -35,15 +39,20 @@ public class InsertCannotAllocateSegmentFault extends
BaseMSQFault
private final String dataSource;
private final Interval interval;
+ @Nullable
+ private final Interval allocatedInterval;
+
@JsonCreator
public InsertCannotAllocateSegmentFault(
@JsonProperty("dataSource") final String dataSource,
- @JsonProperty("interval") final Interval interval
+ @JsonProperty("interval") final Interval interval,
+ @Nullable @JsonProperty("allocatedInterval") final Interval
allocatedInterval
)
{
- super(CODE, "Cannot allocate segment for dataSource [%s], interval [%s]",
dataSource, interval);
+ super(CODE, getErrorMessage(dataSource, interval, allocatedInterval));
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
+ this.allocatedInterval = allocatedInterval;
}
@JsonProperty
@@ -58,6 +67,57 @@ public class InsertCannotAllocateSegmentFault extends
BaseMSQFault
return interval;
}
+ @Nullable
+ @JsonProperty
+ public Interval getAllocatedInterval()
+ {
+ return allocatedInterval;
+ }
+
+ private static String getErrorMessage(
+ final String dataSource,
+ final Interval interval,
+ @Nullable final Interval allocatedInterval
+ )
+ {
+ String errorMessage;
+ if (allocatedInterval == null) {
+ errorMessage = StringUtils.format(
+ "Cannot allocate segment for dataSource [%s], interval [%s]. This
can happen if the prior ingestion "
+ + "uses non-extendable shard specs or if the partitioned by
granularity is different from the granularity of the "
+ + "pre-existing segments. Check the granularities of the
pre-existing segments or re-run the ingestion with REPLACE "
+ + "to overwrite over the existing data",
+ dataSource,
+ interval
+ );
+ } else {
+ errorMessage = StringUtils.format(
+ "Requested segment for dataSource [%s], interval [%s], but got [%s]
interval instead. "
+ + "This happens when an overlapping segment is already present with
a coarser granularity for the requested interval. "
+ + "Either set the partition granularity for the INSERT to [%s] to
append to existing data or use REPLACE to "
+ + "overwrite over the pre-existing segment",
+ dataSource,
+ interval,
+ allocatedInterval,
+ convertIntervalToGranularityString(allocatedInterval)
+ );
+ }
+ return errorMessage;
+ }
+
+ /**
+ * Converts the given interval to a string representing the granularity
which is more user-friendly.
+ */
+ private static String convertIntervalToGranularityString(final Interval
interval)
+ {
+ try {
+ return GranularityType.fromPeriod(interval.toPeriod()).name();
+ }
+ catch (Exception e) {
+ return new DurationGranularity(interval.toDurationMillis(),
null).toString();
+ }
+ }
+
@Override
public boolean equals(Object o)
{
@@ -71,12 +131,14 @@ public class InsertCannotAllocateSegmentFault extends
BaseMSQFault
return false;
}
InsertCannotAllocateSegmentFault that = (InsertCannotAllocateSegmentFault)
o;
- return Objects.equals(dataSource, that.dataSource) &&
Objects.equals(interval, that.interval);
+ return Objects.equals(dataSource, that.dataSource)
+ && Objects.equals(interval, that.interval)
+ && Objects.equals(allocatedInterval, that.allocatedInterval);
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), dataSource, interval);
+ return Objects.hash(super.hashCode(), dataSource, interval,
allocatedInterval);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
index 43a844b5a6..328a0c0b74 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
@@ -19,13 +19,16 @@
package org.apache.druid.msq.util;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
/**
- * Things that would make sense in {@link
org.apache.druid.java.util.common.Intervals} if this were not an extension.
+ * Things that would make sense in {@link Intervals} if this were not an
extension.
*/
public class IntervalUtils
{
@@ -61,4 +64,21 @@ public class IntervalUtils
return retVal;
}
+
+ /**
+ * This method checks if the provided interval is aligned by the granularity
or is an instance of {@link Intervals#ETERNITY}
+ * This is used to check if the granularity allocation made by the overlord
is the same as the one requested in the
+ * SQL query
+ */
+ public static boolean isAligned(
+ final Interval interval,
+ final Granularity granularity
+ )
+ {
+ // AllGranularity needs special handling since AllGranularity#bucketStart
always returns false
+ if (granularity instanceof AllGranularity) {
+ return Intervals.isEternity(interval);
+ }
+ return granularity.isAligned(interval);
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 646286acaf..cfcc107333 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -36,6 +36,8 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Test;
import org.mockito.Mockito;
@@ -51,7 +53,7 @@ import static org.mockito.ArgumentMatchers.isA;
public class MSQFaultsTest extends MSQTestBase
{
@Test
- public void testInsertCannotAllocateSegmentFault()
+ public void testInsertCannotAllocateSegmentFaultWhenNullAllocation()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@@ -62,18 +64,64 @@ public class MSQFaultsTest extends MSQTestBase
Mockito.doReturn(null).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
testIngestQuery().setSql(
- "insert into foo1 select __time, dim1 , count(*) as
cnt from foo where dim1 is not null and __time >= TIMESTAMP '2000-01-02
00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00' group by 1, 2
PARTITIONED by day clustered by dim1")
+ "insert into foo1"
+ + " select __time, dim1 , count(*) as cnt"
+ + " from foo"
+ + " where dim1 is not null and __time >= TIMESTAMP
'2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ + " group by 1, 2"
+ + " PARTITIONED by day"
+ + " clustered by dim1"
+ )
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(
new InsertCannotAllocateSegmentFault(
"foo1",
-
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")
+
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
+ null
)
)
.verifyResults();
}
+ @Test
+ public void testInsertCannotAllocateSegmentFaultWhenInvalidAllocation()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt",
ColumnType.LONG).build();
+
+ // If there is some problem allocating the segment,task action client will
return a null value.
+ Mockito.doReturn(new SegmentIdWithShardSpec(
+ "foo1",
+ Intervals.of("2000-01-01/2000-02-01"),
+ "test",
+ new LinearShardSpec(2)
+ )).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
+
+ testIngestQuery().setSql(
+ "insert into foo1"
+ + " select __time, dim1 , count(*) as cnt"
+ + " from foo"
+ + " where dim1 is not null and __time >= TIMESTAMP
'2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ + " group by 1, 2"
+ + " PARTITIONED by day"
+ + " clustered by dim1"
+ )
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedMSQFault(
+ new InsertCannotAllocateSegmentFault(
+ "foo1",
+
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
+
Intervals.of("2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z")
+ )
+ )
+ .verifyResults();
+ }
+
+
@Test
public void testInsertCannotBeEmptyFault()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index c2ebb2d25a..bcd0de1ef6 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -54,7 +54,12 @@ public class MSQFaultSerdeTest
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", null));
assertFaultSerde(new ColumnTypeNotSupportedFault("the column",
ColumnType.STRING_ARRAY));
assertFaultSerde(new ColumnNameRestrictedFault("the column"));
- assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource",
Intervals.ETERNITY));
+ assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource",
Intervals.ETERNITY, null));
+ assertFaultSerde(new InsertCannotAllocateSegmentFault(
+ "the datasource",
+ Intervals.of("2000-01-01/2002-01-01"),
+ Intervals.ETERNITY
+ ));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
index 6dd47313e7..03371f8b90 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
@@ -19,8 +19,12 @@
package org.apache.druid.msq.util;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.Interval;
+import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@@ -84,6 +88,101 @@ public class IntervalUtilsTest
);
}
+ @Test
+ public void test_doesIntervalMatchesGranularity_withStandardGranularities()
+ {
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(Intervals.ETERNITY, Granularities.ALL)
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2001-01-01"), Granularities.YEAR
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-04-01"), Granularities.QUARTER
+ )
+ );
+
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-02-01"), Granularities.MONTH
+ )
+ );
+
+ // With the way WEEK granularities work, this needs to be aligned to an
actual week
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("1999-12-27/2000-01-03"), Granularities.WEEK
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-01-02"), Granularities.DAY
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T08:00:00.000"),
Granularities.EIGHT_HOUR
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T01:00:00.000"),
Granularities.HOUR
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:01:00.000"),
Granularities.MINUTE
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:00:01.000"),
Granularities.SECOND
+ )
+ );
+
+ Assert.assertFalse(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2002-01-01"), Granularities.YEAR
+ )
+ );
+
+ Assert.assertFalse(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2002-01-08"), Granularities.YEAR
+ )
+ );
+ }
+
+ @Test
+ public void test_doesIntervalMatchesGranularity_withPeriodGranularity()
+ {
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-01-04"),
+ new PeriodGranularity(new Period("P3D"),
DateTimes.of("2000-01-01"), null)
+ )
+ );
+
+ Assert.assertFalse(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-01-04"),
+ new PeriodGranularity(new Period("P3D"),
DateTimes.of("2000-01-02"), null)
+ )
+ );
+ }
+
public static List<Interval> intervals(final String... intervals)
{
return
Arrays.stream(intervals).map(Intervals::of).collect(Collectors.toList());
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
index 812538ae6e..6f5d9fb61e 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
@@ -145,7 +145,8 @@ public abstract class Granularity implements Cacheable
public abstract DateTime toDate(String filePath, Formatter formatter);
/**
- * Return true if time chunks populated by this granularity includes the
given interval time chunk.
+ * Return true only if the time chunks populated by this granularity
includes the given interval time chunk. The
+ * interval must fit exactly into the scheme of the granularity for this to
return true
*/
public abstract boolean isAligned(Interval interval);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]