This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f546cd64a9 MSQ: Ensure that the allocated segment aligns with the 
requested granularity (#14475)
f546cd64a9 is described below

commit f546cd64a9640ed14c9c0f44481678d39530e3b5
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jun 27 09:25:32 2023 +0530

    MSQ: Ensure that the allocated segment aligns with the requested 
granularity (#14475)
    
    Changes:
    - Throw an `InsertCannotAllocateSegmentFault` if the allocated segment is 
not aligned with
    the requested granularity.
    - Tests to verify new behaviour
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 17 +++-
 .../error/InsertCannotAllocateSegmentFault.java    | 70 ++++++++++++++-
 .../org/apache/druid/msq/util/IntervalUtils.java   | 22 ++++-
 .../org/apache/druid/msq/exec/MSQFaultsTest.java   | 54 +++++++++++-
 .../msq/indexing/error/MSQFaultSerdeTest.java      |  7 +-
 .../apache/druid/msq/util/IntervalUtilsTest.java   | 99 ++++++++++++++++++++++
 .../java/util/common/granularity/Granularity.java  |  3 +-
 8 files changed, 262 insertions(+), 12 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index d56d86964f..ec6a5b1543 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -423,7 +423,7 @@ The following table describes error codes you may encounter 
in the `multiStageQu
 | <a name="error_CannotParseExternalData">`CannotParseExternalData`</a> | A 
worker task could not parse data from an external datasource. | `errorMessage`: 
More details on why parsing failed. |
 | <a name="error_ColumnNameRestricted">`ColumnNameRestricted`</a> | The query 
uses a restricted column name. | `columnName`: The restricted column name. |
 | <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The 
column type is not supported. This can be because:<br /> <br /><ul><li>Support 
for writing or reading from a particular column type is not 
supported.</li><li>The query attempted to use a column type that is not 
supported by the frame format. This occurs with ARRAY types, which are not yet 
implemented for frames.</li></ul> | `columnName`: The column name with an 
unsupported type.<br /> <br />`columnType`: The unkn [...]
-| <a 
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | 
The controller task could not allocate a new segment ID due to conflict with 
existing segments or pending segments. Common reasons for such conflicts:<br /> 
<br /><ul><li>Attempting to mix different granularities in the same intervals 
of the same datasource.</li><li>Prior ingestions that used non-extendable shard 
specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the 
attempted new segme [...]
+| <a 
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | 
The controller task could not allocate a new segment ID due to conflict with 
existing segments or pending segments. Common reasons for such conflicts:<br /> 
<br /><ul><li>Attempting to mix different granularities in the same intervals 
of the same datasource.</li><li>Prior ingestions that used non-extendable shard 
specs.</li></ul> <br /> <br /> Use REPLACE to overwrite the existing data or if 
the error conta [...]
 | <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or 
REPLACE query did not generate any output rows in a situation where output rows 
are required for success. This can happen for INSERT or REPLACE queries with 
`PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | 
`dataSource` |
 | <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or 
REPLACE query was canceled by a higher-priority ingestion job, such as a 
real-time ingestion task. | |
 | <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE 
query encountered a null timestamp in the `__time` field.<br /><br />This can 
happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a 
timestamp that cannot be parsed. 
([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null 
when it cannot parse a timestamp.) In this case, try parsing your timestamps 
using a different function or pattern. Or, if your timestamps may g [...]
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index dd163d406d..db5e699717 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -940,7 +940,22 @@ public class ControllerImpl implements Controller
         throw new MSQException(
             new InsertCannotAllocateSegmentFault(
                 task.getDataSource(),
-                segmentGranularity.bucket(timestamp)
+                segmentGranularity.bucket(timestamp),
+                null
+            )
+        );
+      }
+
+      // Even if allocation isn't null, the overlord makes the best effort job 
of allocating a segment with the given
+      // segmentGranularity. This is commonly seen in case when there is 
already a coarser segment in the interval where
+      // the requested segment is present and that segment completely overlaps 
the request. Throw an error if the interval
+      // doesn't match the granularity requested
+      if (!IntervalUtils.isAligned(allocation.getInterval(), 
segmentGranularity)) {
+        throw new MSQException(
+            new InsertCannotAllocateSegmentFault(
+                task.getDataSource(),
+                segmentGranularity.bucket(timestamp),
+                allocation.getInterval()
             )
         );
       }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
index 403af37d9b..f632ae6773 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
@@ -23,8 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.DurationGranularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 @JsonTypeName(InsertCannotAllocateSegmentFault.CODE)
@@ -35,15 +39,20 @@ public class InsertCannotAllocateSegmentFault extends 
BaseMSQFault
   private final String dataSource;
   private final Interval interval;
 
+  @Nullable
+  private final Interval allocatedInterval;
+
   @JsonCreator
   public InsertCannotAllocateSegmentFault(
       @JsonProperty("dataSource") final String dataSource,
-      @JsonProperty("interval") final Interval interval
+      @JsonProperty("interval") final Interval interval,
+      @Nullable @JsonProperty("allocatedInterval") final Interval 
allocatedInterval
   )
   {
-    super(CODE, "Cannot allocate segment for dataSource [%s], interval [%s]", 
dataSource, interval);
+    super(CODE, getErrorMessage(dataSource, interval, allocatedInterval));
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.interval = Preconditions.checkNotNull(interval, "interval");
+    this.allocatedInterval = allocatedInterval;
   }
 
   @JsonProperty
@@ -58,6 +67,57 @@ public class InsertCannotAllocateSegmentFault extends 
BaseMSQFault
     return interval;
   }
 
+  @Nullable
+  @JsonProperty
+  public Interval getAllocatedInterval()
+  {
+    return allocatedInterval;
+  }
+
+  private static String getErrorMessage(
+      final String dataSource,
+      final Interval interval,
+      @Nullable final Interval allocatedInterval
+  )
+  {
+    String errorMessage;
+    if (allocatedInterval == null) {
+      errorMessage = StringUtils.format(
+          "Cannot allocate segment for dataSource [%s], interval [%s]. This 
can happen if the prior ingestion "
+          + "uses non-extendable shard specs or if the partitioned by 
granularity is different from the granularity of the "
+          + "pre-existing segments. Check the granularities of the 
pre-existing segments or re-run the ingestion with REPLACE "
+          + "to overwrite over the existing data",
+          dataSource,
+          interval
+      );
+    } else {
+      errorMessage = StringUtils.format(
+          "Requested segment for dataSource [%s], interval [%s], but got [%s] 
interval instead. "
+          + "This happens when an overlapping segment is already present with 
a coarser granularity for the requested interval. "
+          + "Either set the partition granularity for the INSERT to [%s] to 
append to existing data or use REPLACE to "
+          + "overwrite over the pre-existing segment",
+          dataSource,
+          interval,
+          allocatedInterval,
+          convertIntervalToGranularityString(allocatedInterval)
+      );
+    }
+    return errorMessage;
+  }
+
+  /**
+   * Converts the given interval to a string representing the granularity 
which is more user-friendly.
+   */
+  private static String convertIntervalToGranularityString(final Interval 
interval)
+  {
+    try {
+      return GranularityType.fromPeriod(interval.toPeriod()).name();
+    }
+    catch (Exception e) {
+      return new DurationGranularity(interval.toDurationMillis(), 
null).toString();
+    }
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -71,12 +131,14 @@ public class InsertCannotAllocateSegmentFault extends 
BaseMSQFault
       return false;
     }
     InsertCannotAllocateSegmentFault that = (InsertCannotAllocateSegmentFault) 
o;
-    return Objects.equals(dataSource, that.dataSource) && 
Objects.equals(interval, that.interval);
+    return Objects.equals(dataSource, that.dataSource)
+           && Objects.equals(interval, that.interval)
+           && Objects.equals(allocatedInterval, that.allocatedInterval);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), dataSource, interval);
+    return Objects.hash(super.hashCode(), dataSource, interval, 
allocatedInterval);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
index 43a844b5a6..328a0c0b74 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
@@ -19,13 +19,16 @@
 
 package org.apache.druid.msq.util;
 
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.joda.time.Interval;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Things that would make sense in {@link 
org.apache.druid.java.util.common.Intervals} if this were not an extension.
+ * Things that would make sense in {@link Intervals} if this were not an 
extension.
  */
 public class IntervalUtils
 {
@@ -61,4 +64,21 @@ public class IntervalUtils
 
     return retVal;
   }
+
+  /**
+   * This method checks if the provided interval is aligned by the granularity 
or is an instance of {@link Intervals#ETERNITY}
+   * This is used to check if the granularity allocation made by the overlord 
is the same as the one requested in the
+   * SQL query
+   */
+  public static boolean isAligned(
+      final Interval interval,
+      final Granularity granularity
+  )
+  {
+    // AllGranularity needs special handling since AllGranularity#bucketStart 
always returns false
+    if (granularity instanceof AllGranularity) {
+      return Intervals.isEternity(interval);
+    }
+    return granularity.isAligned(interval);
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 646286acaf..cfcc107333 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -36,6 +36,8 @@ import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestFileUtils;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -51,7 +53,7 @@ import static org.mockito.ArgumentMatchers.isA;
 public class MSQFaultsTest extends MSQTestBase
 {
   @Test
-  public void testInsertCannotAllocateSegmentFault()
+  public void testInsertCannotAllocateSegmentFaultWhenNullAllocation()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -62,18 +64,64 @@ public class MSQFaultsTest extends MSQTestBase
     
Mockito.doReturn(null).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
 
     testIngestQuery().setSql(
-                         "insert into foo1 select  __time, dim1 , count(*) as 
cnt from foo where dim1 is not null and __time >= TIMESTAMP '2000-01-02 
00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00' group by 1, 2 
PARTITIONED by day clustered by dim1")
+                         "insert into foo1"
+                         + " select  __time, dim1 , count(*) as cnt"
+                         + " from foo"
+                         + " where dim1 is not null and __time >= TIMESTAMP 
'2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+                         + " group by 1, 2"
+                         + " PARTITIONED by day"
+                         + " clustered by dim1"
+                     )
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
                      .setExpectedMSQFault(
                          new InsertCannotAllocateSegmentFault(
                              "foo1",
-                             
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")
+                             
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
+                             null
                          )
                      )
                      .verifyResults();
   }
 
+  @Test
+  public void testInsertCannotAllocateSegmentFaultWhenInvalidAllocation()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("dim1", ColumnType.STRING)
+                                            .add("cnt", 
ColumnType.LONG).build();
+
+    // If there is some problem allocating the segment,task action client will 
return a null value.
+    Mockito.doReturn(new SegmentIdWithShardSpec(
+        "foo1",
+        Intervals.of("2000-01-01/2000-02-01"),
+        "test",
+        new LinearShardSpec(2)
+    )).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
+
+    testIngestQuery().setSql(
+                         "insert into foo1"
+                         + " select  __time, dim1 , count(*) as cnt"
+                         + " from foo"
+                         + " where dim1 is not null and __time >= TIMESTAMP 
'2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+                         + " group by 1, 2"
+                         + " PARTITIONED by day"
+                         + " clustered by dim1"
+                     )
+                     .setExpectedDataSource("foo1")
+                     .setExpectedRowSignature(rowSignature)
+                     .setExpectedMSQFault(
+                         new InsertCannotAllocateSegmentFault(
+                             "foo1",
+                             
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
+                             
Intervals.of("2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z")
+                         )
+                     )
+                     .verifyResults();
+  }
+
+
   @Test
   public void testInsertCannotBeEmptyFault()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index c2ebb2d25a..bcd0de1ef6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -54,7 +54,12 @@ public class MSQFaultSerdeTest
     assertFaultSerde(new ColumnTypeNotSupportedFault("the column", null));
     assertFaultSerde(new ColumnTypeNotSupportedFault("the column", 
ColumnType.STRING_ARRAY));
     assertFaultSerde(new ColumnNameRestrictedFault("the column"));
-    assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", 
Intervals.ETERNITY));
+    assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", 
Intervals.ETERNITY, null));
+    assertFaultSerde(new InsertCannotAllocateSegmentFault(
+        "the datasource",
+        Intervals.of("2000-01-01/2002-01-01"),
+        Intervals.ETERNITY
+    ));
     assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
     assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
     assertFaultSerde(InsertTimeNullFault.INSTANCE);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
index 6dd47313e7..03371f8b90 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
@@ -19,8 +19,12 @@
 
 package org.apache.druid.msq.util;
 
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -84,6 +88,101 @@ public class IntervalUtilsTest
     );
   }
 
+  @Test
+  public void test_doesIntervalMatchesGranularity_withStandardGranularities()
+  {
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(Intervals.ETERNITY, Granularities.ALL)
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2001-01-01"), Granularities.YEAR
+        )
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2000-04-01"), Granularities.QUARTER
+        )
+    );
+
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2000-02-01"), Granularities.MONTH
+        )
+    );
+
+    // With the way WEEK granularities work, this needs to be aligned to an 
actual week
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("1999-12-27/2000-01-03"), Granularities.WEEK
+        )
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2000-01-02"), Granularities.DAY
+        )
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01T00:00:00.000/2000-01-01T08:00:00.000"), 
Granularities.EIGHT_HOUR
+        )
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01T00:00:00.000/2000-01-01T01:00:00.000"), 
Granularities.HOUR
+        )
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:01:00.000"), 
Granularities.MINUTE
+        )
+    );
+
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:00:01.000"), 
Granularities.SECOND
+        )
+    );
+
+    Assert.assertFalse(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2002-01-01"), Granularities.YEAR
+        )
+    );
+
+    Assert.assertFalse(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2002-01-08"), Granularities.YEAR
+        )
+    );
+  }
+
+  @Test
+  public void test_doesIntervalMatchesGranularity_withPeriodGranularity()
+  {
+    Assert.assertTrue(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2000-01-04"),
+            new PeriodGranularity(new Period("P3D"), 
DateTimes.of("2000-01-01"), null)
+        )
+    );
+
+    Assert.assertFalse(
+        IntervalUtils.isAligned(
+            Intervals.of("2000-01-01/2000-01-04"),
+            new PeriodGranularity(new Period("P3D"), 
DateTimes.of("2000-01-02"), null)
+        )
+    );
+  }
+
   public static List<Interval> intervals(final String... intervals)
   {
     return 
Arrays.stream(intervals).map(Intervals::of).collect(Collectors.toList());
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
index 812538ae6e..6f5d9fb61e 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
@@ -145,7 +145,8 @@ public abstract class Granularity implements Cacheable
   public abstract DateTime toDate(String filePath, Formatter formatter);
 
   /**
-   * Return true if time chunks populated by this granularity includes the 
given interval time chunk.
+   * Return true only if the time chunks populated by this granularity 
includes the given interval time chunk. The
+   * interval must fit exactly into the scheme of the granularity for this to 
return true
    */
   public abstract boolean isAligned(Interval interval);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to