This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4096f51f0b add configurable ColumnTypeMergePolicy to 
SegmentMetadataCache (#14319)
4096f51f0b is described below

commit 4096f51f0b29b3b4787ac12aa1a7493ac216c6c5
Author: Clint Wylie <[email protected]>
AuthorDate: Wed May 24 08:02:51 2023 -0700

    add configurable ColumnTypeMergePolicy to SegmentMetadataCache (#14319)
    
    This PR adds a new interface to control how SegmentMetadataCache chooses 
ColumnType when faced with differences between segments for SQL schemas which 
are computed, exposed as druid.sql.planner.metadataColumnTypeMergePolicy and 
adds a new 'least restrictive type' mode to allow choosing the type that data 
across all segments can best be coerced into and sets this as the default 
behavior.
    
    This is a behavior change around when segment driven schema migrations take 
effect for the SQL schema. With latestInterval, the SQL schema will be updated 
as soon as the first job with the new schema has published segments, while 
using leastRestrictive, the schema will only be updated once all segments are 
reindexed to the new type. The benefit of leastRestrictive is that it 
eliminates a bunch of type coercion errors that can happen in SQL when types 
are varied across segments with la [...]
---
 docs/configuration/index.md                        |   1 +
 .../druid/math/expr/ExpressionTypeConversion.java  |  11 +-
 .../druid/segment/NestedDataColumnMerger.java      |   3 +
 .../apache/druid/segment/column/ColumnType.java    |  12 +-
 .../org/apache/druid/segment/column/Types.java     |   9 +
 .../druid/segment/column/ColumnTypeTest.java       |   4 +-
 .../sql/calcite/planner/CalcitePlannerModule.java  |   6 +-
 .../planner/SegmentMetadataCacheConfig.java        |  36 +--
 .../sql/calcite/schema/SegmentMetadataCache.java   | 145 ++++++++--
 .../planner/SegmentMetadataCacheConfigTest.java    |  78 ++++--
 .../calcite/schema/SegmentMetadataCacheTest.java   | 307 +++++++++++++++------
 11 files changed, 458 insertions(+), 154 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index abf21428fe..a367c6b597 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1953,6 +1953,7 @@ The Druid SQL server is configured through the following 
properties on the Broke
 |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at 
`/druid/v2/sql/`.|true|
 |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN 
query](../querying/topnquery.md). Higher limits will be planned as [GroupBy 
queries](../querying/groupbyquery.md) instead.|100000|
 |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata 
refreshes.|PT1M|
+|`druid.sql.planner.metadataColumnTypeMergePolicy`|Defines how column types 
will be chosen when faced with differences between segments when computing the 
SQL schema. Options are specified as a JSON object, with valid choices of 
`leastRestrictive` or `latestInterval`. For `leastRestrictive`, Druid will 
automatically widen the type computed for the schema to a type which data 
across all segments can be converted into, however planned schema migrations 
can only take effect once all segment [...]
 |`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate 
cardinality algorithm for `COUNT(DISTINCT foo)`.|true|
 |`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when 
`useApproximateCountDistinct` is disabled. If set to true, exact distinct 
queries are re-written using grouping sets. Otherwise, exact distinct queries 
are re-written using joins. This should be set to true for group by query with 
multiple exact distinct aggregations. This flag can be overridden per 
query.|false|
 |`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN 
queries](../querying/topnquery.md) when a SQL query could be expressed as such. 
If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used 
instead.|true|
diff --git 
a/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java
 
b/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java
index 584a77ed36..1221fa5500 100644
--- 
a/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java
+++ 
b/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.math.expr;
 
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.segment.column.Types;
 
 import javax.annotation.Nullable;
@@ -83,7 +82,7 @@ public class ExpressionTypeConversion
     }
     if (type.isArray() || other.isArray()) {
       if (!Objects.equals(type, other)) {
-        throw new IAE("Cannot implicitly cast %s to %s", type, other);
+        throw new Types.IncompatibleTypeException(type, other);
       }
       return type;
     }
@@ -95,7 +94,7 @@ public class ExpressionTypeConversion
         return type;
       }
       if (!Objects.equals(type, other)) {
-        throw new IAE("Cannot implicitly cast %s to %s", type, other);
+        throw new Types.IncompatibleTypeException(type, other);
       }
       return type;
     }
@@ -128,7 +127,7 @@ public class ExpressionTypeConversion
     // arrays cannot be auto converted
     if (type.isArray() || other.isArray()) {
       if (!Objects.equals(type, other)) {
-        throw new IAE("Cannot implicitly cast %s to %s", type, other);
+        throw new Types.IncompatibleTypeException(type, other);
       }
       return type;
     }
@@ -140,7 +139,7 @@ public class ExpressionTypeConversion
         return type;
       }
       if (!Objects.equals(type, other)) {
-        throw new IAE("Cannot implicitly cast %s to %s", type, other);
+        throw new Types.IncompatibleTypeException(type, other);
       }
       return type;
     }
@@ -177,7 +176,7 @@ public class ExpressionTypeConversion
       return ExpressionTypeFactory.getInstance().ofArray(newElementType);
     }
 
-    throw new IAE("Cannot implicitly cast %s to %s", type, other);
+    throw new Types.IncompatibleTypeException(type, other);
   }
 
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java 
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
index f1a4ea7bc0..c3a3047e0f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
@@ -93,6 +93,9 @@ public class NestedDataColumnMerger implements 
DimensionMergerV9
         final IndexableAdapter.NestedColumnMergable mergable = closer.register(
             adapter.getNestedColumnMergeables(name)
         );
+        if (mergable == null) {
+          continue;
+        }
         final SortedValueDictionary dimValues = mergable.getValueDictionary();
 
         boolean allNulls = dimValues == null || dimValues.allNull();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java 
b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
index 9891bac9b4..dbfed07749 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
 
 import javax.annotation.Nullable;
@@ -152,7 +151,7 @@ public class ColumnType extends BaseTypeSignature<ValueType>
    *                                                                   
inference
    */
   @Nullable
-  public static ColumnType leastRestrictiveType(@Nullable ColumnType type, 
@Nullable ColumnType other)
+  public static ColumnType leastRestrictiveType(@Nullable ColumnType type, 
@Nullable ColumnType other) throws Types.IncompatibleTypeException
   {
     if (type == null) {
       return other;
@@ -168,7 +167,7 @@ public class ColumnType extends BaseTypeSignature<ValueType>
         return type;
       }
       if (!Objects.equals(type, other)) {
-        throw new IAE("Cannot implicitly cast %s to %s", type, other);
+        throw new Types.IncompatibleTypeException(type, other);
       }
       return type;
     }
@@ -177,7 +176,7 @@ public class ColumnType extends BaseTypeSignature<ValueType>
       if (ColumnType.NESTED_DATA.equals(type) || 
ColumnType.NESTED_DATA.equals(other)) {
         return ColumnType.NESTED_DATA;
       }
-      throw new IAE("Cannot implicitly cast %s to %s", type, other);
+      throw new Types.IncompatibleTypeException(type, other);
     }
 
     // arrays convert based on least restrictive element type
@@ -186,11 +185,13 @@ public class ColumnType extends 
BaseTypeSignature<ValueType>
         return type;
       }
       final ColumnType commonElementType;
+      // commonElementType cannot be null if we got this far, we always return 
a value unless both args are null
       if (other.isArray()) {
         commonElementType = leastRestrictiveType(
             (ColumnType) type.getElementType(),
             (ColumnType) other.getElementType()
         );
+
         return ColumnType.ofArray(commonElementType);
       } else {
         commonElementType = leastRestrictiveType(
@@ -218,13 +219,14 @@ public class ColumnType extends 
BaseTypeSignature<ValueType>
     }
 
     // all numbers win over longs
-    // floats vs doubles would be handled here, but we currently only support 
doubles...
     if (Types.is(type, ValueType.LONG) && Types.isNullOr(other, 
ValueType.LONG)) {
       return ColumnType.LONG;
     }
+    // doubles win over floats
     if (Types.is(type, ValueType.FLOAT) && Types.isNullOr(other, 
ValueType.FLOAT)) {
       return ColumnType.FLOAT;
     }
     return ColumnType.DOUBLE;
   }
+
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/Types.java 
b/processing/src/main/java/org/apache/druid/segment/column/Types.java
index 44c7b13720..02da5c8bfa 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/Types.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/Types.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.column;
 
 import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 
 import javax.annotation.Nullable;
@@ -112,4 +113,12 @@ public class Types
     return (typeSignature1 != null && typeSignature1.is(typeDescriptor)) ||
            (typeSignature2 != null && typeSignature2.is(typeDescriptor));
   }
+
+  public static class IncompatibleTypeException extends IAE
+  {
+    public IncompatibleTypeException(TypeSignature<?> type, TypeSignature<?> 
other)
+    {
+      super("Cannot implicitly cast [%s] to [%s]", type, other);
+    }
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java 
b/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java
index 89f0aeb80f..a66a9ad2d6 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java
@@ -163,8 +163,8 @@ public class ColumnTypeTest
     Assert.assertEquals(ColumnType.NESTED_DATA, 
ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, 
ColumnType.UNKNOWN_COMPLEX));
     Assert.assertEquals(SOME_COMPLEX, 
ColumnType.leastRestrictiveType(SOME_COMPLEX, ColumnType.UNKNOWN_COMPLEX));
     Assert.assertEquals(SOME_COMPLEX, 
ColumnType.leastRestrictiveType(ColumnType.UNKNOWN_COMPLEX, SOME_COMPLEX));
-    Assert.assertThrows(IllegalArgumentException.class, () -> 
ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, SOME_COMPLEX));
-    Assert.assertThrows(IllegalArgumentException.class, () -> 
ColumnType.leastRestrictiveType(ColumnType.STRING_ARRAY, SOME_COMPLEX));
+    Assert.assertThrows(Types.IncompatibleTypeException.class, () -> 
ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, SOME_COMPLEX));
+    Assert.assertThrows(Types.IncompatibleTypeException.class, () -> 
ColumnType.leastRestrictiveType(ColumnType.STRING_ARRAY, SOME_COMPLEX));
   }
 
   static class SomeOtherTypeSignature extends BaseTypeSignature<ValueType>
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java
index 3b8e8b7ec0..484de5c465 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java
@@ -31,6 +31,8 @@ import 
org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
  */
 public class CalcitePlannerModule implements Module
 {
+  public static final String CONFIG_BASE = "druid.sql.planner";
+
   @Override
   public void configure(Binder binder)
   {
@@ -38,8 +40,8 @@ public class CalcitePlannerModule implements Module
     // so both configs are bound to the same property prefix.
     // It turns out that the order of the arguments above is misleading.
     // We're actually binding the class to the config prefix, not the other 
way around.
-    JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class);
-    JsonConfigProvider.bind(binder, "druid.sql.planner", 
SegmentMetadataCacheConfig.class);
+    JsonConfigProvider.bind(binder, CONFIG_BASE, PlannerConfig.class);
+    JsonConfigProvider.bind(binder, CONFIG_BASE, 
SegmentMetadataCacheConfig.class);
     binder.bind(PlannerFactory.class).in(LazySingleton.class);
     binder.bind(DruidOperatorTable.class).in(LazySingleton.class);
     Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java
index 17e342c4fe..dc4d94b78b 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java
@@ -20,10 +20,9 @@
 package org.apache.druid.sql.calcite.planner;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.sql.calcite.schema.SegmentMetadataCache;
 import org.joda.time.Period;
 
-import java.util.Objects;
-
 /**
  * Configuration properties for the Broker-side cache of segment metadata
  * used to infer datasources for SQL. This class shares the same config root
@@ -32,6 +31,9 @@ import java.util.Objects;
  */
 public class SegmentMetadataCacheConfig
 {
+  @JsonProperty
+  private boolean awaitInitializationOnStart = true;
+
   @JsonProperty
   private boolean metadataSegmentCacheEnable = false;
 
@@ -42,7 +44,8 @@ public class SegmentMetadataCacheConfig
   private Period metadataRefreshPeriod = new Period("PT1M");
 
   @JsonProperty
-  private boolean awaitInitializationOnStart = true;
+  private SegmentMetadataCache.ColumnTypeMergePolicy 
metadataColumnTypeMergePolicy =
+      new SegmentMetadataCache.LeastRestrictiveTypeMergePolicy();
 
   public static SegmentMetadataCacheConfig create()
   {
@@ -78,31 +81,9 @@ public class SegmentMetadataCacheConfig
     return metadataSegmentPollPeriod;
   }
 
-  @Override
-  public boolean equals(final Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    final SegmentMetadataCacheConfig that = (SegmentMetadataCacheConfig) o;
-    return awaitInitializationOnStart == that.awaitInitializationOnStart &&
-           metadataSegmentCacheEnable == that.metadataSegmentCacheEnable &&
-           metadataSegmentPollPeriod == that.metadataSegmentPollPeriod &&
-           Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod);
-  }
-
-  @Override
-  public int hashCode()
+  public SegmentMetadataCache.ColumnTypeMergePolicy 
getMetadataColumnTypeMergePolicy()
   {
-    return Objects.hash(
-        metadataRefreshPeriod,
-        awaitInitializationOnStart,
-        metadataSegmentCacheEnable,
-        metadataSegmentPollPeriod
-    );
+    return metadataColumnTypeMergePolicy;
   }
 
   @Override
@@ -113,6 +94,7 @@ public class SegmentMetadataCacheConfig
            ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable +
            ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod +
            ", awaitInitializationOnStart=" + awaitInitializationOnStart +
+           ", metadataColumnTypeMergePolicy=" + metadataColumnTypeMergePolicy +
            '}';
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
index 8da4e647a9..71c19734ec 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.calcite.schema;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -36,6 +37,7 @@ import org.apache.druid.client.ServerView;
 import org.apache.druid.client.TimelineServerView;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -56,6 +58,7 @@ import 
org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.Types;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.SegmentManager;
@@ -75,6 +78,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
@@ -107,7 +111,6 @@ public class SegmentMetadataCache
   private static final int MAX_SEGMENTS_PER_QUERY = 15000;
   private static final long DEFAULT_NUM_ROWS = 0;
   private static final Interner<RowSignature> ROW_SIGNATURE_INTERNER = 
Interners.newWeakInterner();
-
   private final QueryLifecycleFactory queryLifecycleFactory;
   private final SegmentMetadataCacheConfig config;
   // Escalator, so we can attach an authentication result to queries we 
generate.
@@ -117,6 +120,7 @@ public class SegmentMetadataCache
   private final ExecutorService cacheExec;
   private final ExecutorService callbackExec;
   private final ServiceEmitter emitter;
+  private final ColumnTypeMergePolicy columnTypeMergePolicy;
 
   /**
    * Map of DataSource -> DruidTable.
@@ -229,6 +233,7 @@ public class SegmentMetadataCache
     this.segmentManager = segmentManager;
     this.joinableFactory = joinableFactory;
     this.config = Preconditions.checkNotNull(config, "config");
+    this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
     this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
     this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
     this.escalator = escalator;
@@ -803,25 +808,11 @@ public class SegmentMetadataCache
         final RowSignature rowSignature = 
availableSegmentMetadata.getRowSignature();
         if (rowSignature != null) {
           for (String column : rowSignature.getColumnNames()) {
-            // Newer column types should override older ones.
             final ColumnType columnType =
                 rowSignature.getColumnType(column)
                             .orElseThrow(() -> new ISE("Encountered null type 
for column [%s]", column));
 
-            columnTypes.compute(column, (c, existingType) -> {
-              if (existingType == null) {
-                return columnType;
-              }
-              if (columnType == null) {
-                return existingType;
-              }
-              // if any are json, are all json
-              if (ColumnType.NESTED_DATA.equals(columnType) || 
ColumnType.NESTED_DATA.equals(existingType)) {
-                return ColumnType.NESTED_DATA;
-              }
-              // "existing type" is the 'newest' type, since we iterate the 
segments list by newest start time
-              return existingType;
-            });
+            columnTypes.compute(column, (c, existingType) -> 
columnTypeMergePolicy.merge(existingType, columnType));
           }
         }
       }
@@ -995,4 +986,126 @@ public class SegmentMetadataCache
       runnable.run();
     }
   }
+
+
+  /**
+   * ColumnTypeMergePolicy defines the rules of which type to use when faced 
with the possibility of different types
+   * for the same column from segment to segment. It is used to help compute a 
{@link RowSignature} for a table in
+   * Druid based on the segment metadata of all segments, merging the types of 
each column encountered to end up with
+   * a single type to represent it globally.
+   */
+  @FunctionalInterface
+  public interface ColumnTypeMergePolicy
+  {
+    ColumnType merge(ColumnType existingType, ColumnType newType);
+
+    @JsonCreator
+    static ColumnTypeMergePolicy fromString(String type)
+    {
+      if (LeastRestrictiveTypeMergePolicy.NAME.equalsIgnoreCase(type)) {
+        return LeastRestrictiveTypeMergePolicy.INSTANCE;
+      }
+      if (FirstTypeMergePolicy.NAME.equalsIgnoreCase(type)) {
+        return FirstTypeMergePolicy.INSTANCE;
+      }
+      throw new IAE("Unknown type [%s]", type);
+    }
+  }
+
+  /**
+   * Classic logic, we use the first type we encounter. This policy is 
effectively 'newest first' because we iterated
+   * segments starting from the most recent time chunk, so this typically 
results in the most recently used type being
+   * chosen, at least for systems that are continuously updated with 'current' 
data.
+   *
+   * Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, 
at least in systems using SQL schemas which
+   * are partially or fully computed by this cache, this merge policy can 
result in query time errors if incompatible
+   * types are mixed if the chosen type is more restrictive than the types of 
some segments. If data is likely to vary
+   * in type across segments, consider using {@link 
LeastRestrictiveTypeMergePolicy} instead.
+   */
+  public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy
+  {
+    public static final String NAME = "latestInterval";
+    private static final FirstTypeMergePolicy INSTANCE = new 
FirstTypeMergePolicy();
+
+    @Override
+    public ColumnType merge(ColumnType existingType, ColumnType newType)
+    {
+      if (existingType == null) {
+        return newType;
+      }
+      if (newType == null) {
+        return existingType;
+      }
+      // if any are json, are all json
+      if (ColumnType.NESTED_DATA.equals(newType) || 
ColumnType.NESTED_DATA.equals(existingType)) {
+        return ColumnType.NESTED_DATA;
+      }
+      // "existing type" is the 'newest' type, since we iterate the segments 
list by newest start time
+      return existingType;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(NAME);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public String toString()
+    {
+      return NAME;
+    }
+  }
+
+  /**
+   * Resolves types using {@link ColumnType#leastRestrictiveType(ColumnType, 
ColumnType)} to find the ColumnType that
+   * can best represent all data contained across all segments.
+   */
+  public static class LeastRestrictiveTypeMergePolicy implements 
ColumnTypeMergePolicy
+  {
+    public static final String NAME = "leastRestrictive";
+    private static final LeastRestrictiveTypeMergePolicy INSTANCE = new 
LeastRestrictiveTypeMergePolicy();
+
+    @Override
+    public ColumnType merge(ColumnType existingType, ColumnType newType)
+    {
+      try {
+        return ColumnType.leastRestrictiveType(existingType, newType);
+      }
+      catch (Types.IncompatibleTypeException incompatibleTypeException) {
+        // fall back to first encountered type if they are not compatible for 
some reason
+        return FirstTypeMergePolicy.INSTANCE.merge(existingType, newType);
+      }
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(NAME);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public String toString()
+    {
+      return NAME;
+    }
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java
index 6afa5c7728..858f2e5d8a 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java
@@ -19,12 +19,17 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.sql.calcite.schema.SegmentMetadataCache;
 import org.joda.time.Period;
+import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import java.util.Properties;
 
 /**
  * Pathetic little unit test just to keep Jacoco happy.
@@ -32,19 +37,60 @@ import static org.junit.Assert.assertTrue;
 public class SegmentMetadataCacheConfigTest
 {
   @Test
-  public void testConfig()
+  public void testDefaultConfig()
   {
-    SegmentMetadataCacheConfig config = 
SegmentMetadataCacheConfig.create("PT1M");
-    assertEquals(Period.minutes(1), config.getMetadataRefreshPeriod());
-    assertTrue(config.isAwaitInitializationOnStart());
-    // Not legal per IntelliJ inspections. Should be testable, but IntelliJ
-    // won't allow this code.
-    //assertTrue(config.equals(config));
-    // Workaround
-    assertTrue(config.equals(SegmentMetadataCacheConfig.create("PT1M")));
-    assertFalse(config.equals(null));
-    assertTrue(config.equals(SegmentMetadataCacheConfig.create("PT1M")));
-    assertFalse(config.equals(SegmentMetadataCacheConfig.create("PT2M")));
-    assertTrue(config.hashCode() != 0);
+    final Injector injector = createInjector();
+    final JsonConfigProvider<SegmentMetadataCacheConfig> provider = 
JsonConfigProvider.of(
+        CalcitePlannerModule.CONFIG_BASE,
+        SegmentMetadataCacheConfig.class
+    );
+    final Properties properties = new Properties();
+    provider.inject(properties, injector.getInstance(JsonConfigurator.class));
+    final SegmentMetadataCacheConfig config = provider.get();
+    Assert.assertTrue(config.isAwaitInitializationOnStart());
+    Assert.assertFalse(config.isMetadataSegmentCacheEnable());
+    Assert.assertEquals(Period.minutes(1), config.getMetadataRefreshPeriod());
+    Assert.assertEquals(60_000, config.getMetadataSegmentPollPeriod());
+    Assert.assertEquals(new 
SegmentMetadataCache.LeastRestrictiveTypeMergePolicy(), 
config.getMetadataColumnTypeMergePolicy());
+  }
+
+  @Test
+  public void testCustomizedConfig()
+  {
+    final Injector injector = createInjector();
+    final JsonConfigProvider<SegmentMetadataCacheConfig> provider = 
JsonConfigProvider.of(
+        CalcitePlannerModule.CONFIG_BASE,
+        SegmentMetadataCacheConfig.class
+    );
+    final Properties properties = new Properties();
+    properties.setProperty(
+        CalcitePlannerModule.CONFIG_BASE + ".metadataColumnTypeMergePolicy",
+        "latestInterval"
+    );
+    properties.setProperty(CalcitePlannerModule.CONFIG_BASE + 
".metadataRefreshPeriod", "PT2M");
+    properties.setProperty(CalcitePlannerModule.CONFIG_BASE + 
".metadataSegmentPollPeriod", "15000");
+    properties.setProperty(CalcitePlannerModule.CONFIG_BASE + 
".metadataSegmentCacheEnable", "true");
+    properties.setProperty(CalcitePlannerModule.CONFIG_BASE + 
".awaitInitializationOnStart", "false");
+    provider.inject(properties, injector.getInstance(JsonConfigurator.class));
+    final SegmentMetadataCacheConfig config = provider.get();
+    Assert.assertFalse(config.isAwaitInitializationOnStart());
+    Assert.assertTrue(config.isMetadataSegmentCacheEnable());
+    Assert.assertEquals(Period.minutes(2), config.getMetadataRefreshPeriod());
+    Assert.assertEquals(15_000, config.getMetadataSegmentPollPeriod());
+    Assert.assertEquals(
+        new SegmentMetadataCache.FirstTypeMergePolicy(),
+        config.getMetadataColumnTypeMergePolicy()
+    );
+  }
+
+  private Injector createInjector()
+  {
+    return GuiceInjectors.makeStartupInjectorWithModules(
+        ImmutableList.of(
+            binder -> {
+              JsonConfigProvider.bind(binder, 
CalcitePlannerModule.CONFIG_BASE, SegmentMetadataCacheConfig.class);
+            }
+        )
+    );
   }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
index 213ad453a6..fb55d22710 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
@@ -32,6 +32,10 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -64,10 +68,12 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AllowAllAuthenticator;
 import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig;
 import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.apache.druid.sql.calcite.table.DruidTable;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.sql.calcite.util.TestDataBuilder;
 import org.apache.druid.sql.calcite.util.TestServerInventoryView;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
@@ -137,6 +143,78 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
                                               )
                                               .rows(ROWS2)
                                               .buildMMappedIndex();
+
+    final InputRowSchema rowSchema = new InputRowSchema(
+        new TimestampSpec("t", null, null),
+        DimensionsSpec.builder().useSchemaDiscovery(true).build(),
+        null
+    );
+    final List<InputRow> autoRows1 = ImmutableList.of(
+        TestDataBuilder.createRow(
+            ImmutableMap.<String, Object>builder()
+                        .put("t", "2023-01-01T00:00Z")
+                        .put("numbery", 1.1f)
+                        .put("numberyArrays", ImmutableList.of(1L, 2L, 3L))
+                        .put("stringy", ImmutableList.of("a", "b", "c"))
+                        .put("array", ImmutableList.of(1.1, 2.2, 3.3))
+                        .put("nested", ImmutableMap.of("x", 1L, "y", 2L))
+                        .build(),
+            rowSchema
+        )
+    );
+    final List<InputRow> autoRows2 = ImmutableList.of(
+        TestDataBuilder.createRow(
+            ImmutableMap.<String, Object>builder()
+                        .put("t", "2023-01-02T00:00Z")
+                        .put("numbery", 1L)
+                        .put("numberyArrays", ImmutableList.of(3.3, 2.2, 3.1))
+                        .put("stringy", "a")
+                        .put("array", ImmutableList.of(1L, 2L, 3L))
+                        .put("nested", "hello")
+                        .build(),
+            rowSchema
+        )
+    );
+    final QueryableIndex indexAuto1 = IndexBuilder.create()
+                                              .tmpDir(new File(tmpDir, "1"))
+                                              
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                              .schema(
+                                                  new 
IncrementalIndexSchema.Builder()
+                                                      
.withTimestampSpec(rowSchema.getTimestampSpec())
+                                                      
.withDimensionsSpec(rowSchema.getDimensionsSpec())
+                                                      .withMetrics(
+                                                          new 
CountAggregatorFactory("cnt"),
+                                                          new 
DoubleSumAggregatorFactory("m1", "m1"),
+                                                          new 
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+                                                      )
+                                                      .withRollup(false)
+                                                      .build()
+                                              )
+                                              .rows(autoRows1)
+                                              .buildMMappedIndex();
+
+    final QueryableIndex indexAuto2 = IndexBuilder.create()
+                                                  .tmpDir(new File(tmpDir, 
"1"))
+                                                  
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                                  .schema(
+                                                      new 
IncrementalIndexSchema.Builder()
+                                                          .withTimestampSpec(
+                                                              new 
TimestampSpec("t", null, null)
+                                                          )
+                                                          .withDimensionsSpec(
+                                                              
DimensionsSpec.builder().useSchemaDiscovery(true).build()
+                                                          )
+                                                          .withMetrics(
+                                                              new 
CountAggregatorFactory("cnt"),
+                                                              new 
DoubleSumAggregatorFactory("m1", "m1"),
+                                                              new 
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+                                                          )
+                                                          .withRollup(false)
+                                                          .build()
+                                                  )
+                                                  .rows(autoRows2)
+                                                  .buildMMappedIndex();
+
     walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
         DataSegment.builder()
                    .dataSource(CalciteTests.DATASOURCE1)
@@ -164,6 +242,24 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
                    .size(0)
                    .build(),
         index2
+    ).add(
+        DataSegment.builder()
+                   .dataSource(CalciteTests.SOME_DATASOURCE)
+                   .interval(Intervals.of("2023-01-01T00Z/P1D"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(1))
+                   .size(0)
+                   .build(),
+        indexAuto1
+    ).add(
+        DataSegment.builder()
+                   .dataSource(CalciteTests.SOME_DATASOURCE)
+                   .interval(Intervals.of("2023-01-02T00Z/P1D"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(1))
+                   .size(0)
+                   .build(),
+        indexAuto2
     );
     final DataSegment segment1 = new DataSegment(
         "foo3",
@@ -183,45 +279,12 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     druidServers = serverView.getDruidServers();
   }
 
-  public SegmentMetadataCache buildSchema1() throws InterruptedException
+  public SegmentMetadataCache buildSchemaMarkAndTableLatch() throws 
InterruptedException
   {
-    Preconditions.checkState(runningSchema == null);
-    runningSchema = new SegmentMetadataCache(
-        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
-        serverView,
-        segmentManager,
-        new MapJoinableFactory(
-            ImmutableSet.of(globalTableJoinable),
-            ImmutableMap.of(globalTableJoinable.getClass(), 
GlobalTableDataSource.class)
-        ),
-        SEGMENT_CACHE_CONFIG_DEFAULT,
-        new NoopEscalator(),
-        new BrokerInternalQueryConfig(),
-        new NoopServiceEmitter()
-    )
-    {
-      @Override
-      protected DatasourceTable.PhysicalDatasourceMetadata 
buildDruidTable(String dataSource)
-      {
-        DatasourceTable.PhysicalDatasourceMetadata table = 
super.buildDruidTable(dataSource);
-        buildTableLatch.countDown();
-        return table;
-      }
-
-      @Override
-      void markDataSourceAsNeedRebuild(String datasource)
-      {
-        super.markDataSourceAsNeedRebuild(datasource);
-        markDataSourceLatch.countDown();
-      }
-    };
-
-    runningSchema.start();
-    runningSchema.awaitInitialization();
-    return runningSchema;
+    return buildSchemaMarkAndTableLatch(SEGMENT_CACHE_CONFIG_DEFAULT);
   }
 
-  public SegmentMetadataCache buildSchema2() throws InterruptedException
+  public SegmentMetadataCache 
buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws 
InterruptedException
   {
     Preconditions.checkState(runningSchema == null);
     runningSchema = new SegmentMetadataCache(
@@ -232,14 +295,12 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
             ImmutableSet.of(globalTableJoinable),
             ImmutableMap.of(globalTableJoinable.getClass(), 
GlobalTableDataSource.class)
         ),
-        SEGMENT_CACHE_CONFIG_DEFAULT,
+        config,
         new NoopEscalator(),
         new BrokerInternalQueryConfig(),
         new NoopServiceEmitter()
     )
     {
-      boolean throwException = true;
-
       @Override
       protected DatasourceTable.PhysicalDatasourceMetadata 
buildDruidTable(String dataSource)
       {
@@ -248,17 +309,6 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
         return table;
       }
 
-      @Override
-      protected Set<SegmentId> refreshSegments(final Set<SegmentId> segments) 
throws IOException
-      {
-        if (throwException) {
-          throwException = false;
-          throw new RuntimeException("Query[xxxx] 
url[http://xxxx:8083/druid/v2/] timed out.");
-        } else {
-          return super.refreshSegments(segments);
-        }
-      }
-
       @Override
       void markDataSourceAsNeedRebuild(String datasource)
       {
@@ -272,7 +322,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     return runningSchema;
   }
 
-  public SegmentMetadataCache buildSchema3() throws InterruptedException
+  public SegmentMetadataCache buildSchemaMarkAndRefreshLatch() throws 
InterruptedException
   {
     Preconditions.checkState(runningSchema == null);
     runningSchema = new SegmentMetadataCache(
@@ -322,24 +372,24 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testGetTableMap() throws InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
-    Assert.assertEquals(ImmutableSet.of("foo", "foo2"), 
schema.getDatasourceNames());
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
+    Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, 
CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), 
schema.getDatasourceNames());
 
     final Set<String> tableNames = schema.getDatasourceNames();
-    Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableNames);
+    Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, 
CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), tableNames);
   }
 
   @Test
   public void testSchemaInit() throws InterruptedException
   {
-    SegmentMetadataCache schema2 = buildSchema1();
-    Assert.assertEquals(ImmutableSet.of("foo", "foo2"), 
schema2.getDatasourceNames());
+    SegmentMetadataCache schema2 = buildSchemaMarkAndTableLatch();
+    Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, 
CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), 
schema2.getDatasourceNames());
   }
 
   @Test
   public void testGetTableMapFoo() throws InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     final DatasourceTable.PhysicalDatasourceMetadata fooDs = 
schema.getDatasource("foo");
     final DruidTable fooTable = new DatasourceTable(fooDs);
     final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
@@ -354,7 +404,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     Assert.assertEquals(SqlTypeName.VARCHAR, 
fields.get(1).getType().getSqlTypeName());
 
     Assert.assertEquals("m1", fields.get(2).getName());
-    Assert.assertEquals(SqlTypeName.BIGINT, 
fields.get(2).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(2).getType().getSqlTypeName());
 
     Assert.assertEquals("dim1", fields.get(3).getName());
     Assert.assertEquals(SqlTypeName.VARCHAR, 
fields.get(3).getType().getSqlTypeName());
@@ -369,7 +419,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testGetTableMapFoo2() throws InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     final DatasourceTable.PhysicalDatasourceMetadata fooDs = 
schema.getDatasource("foo2");
     final DruidTable fooTable = new DatasourceTable(fooDs);
     final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
@@ -387,6 +437,103 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     Assert.assertEquals(SqlTypeName.BIGINT, 
fields.get(2).getType().getSqlTypeName());
   }
 
+  @Test
+  public void testGetTableMapSomeTable() throws InterruptedException
+  {
+    // using 'newest first' column type merge strategy, the types are expected 
to be the types defined in the newer
+    // segment, except for json, which is special handled
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(
+        new SegmentMetadataCacheConfig() {
+          @Override
+          public SegmentMetadataCache.ColumnTypeMergePolicy 
getMetadataColumnTypeMergePolicy()
+          {
+            return new SegmentMetadataCache.FirstTypeMergePolicy();
+          }
+        }
+    );
+    final DatasourceTable.PhysicalDatasourceMetadata fooDs = 
schema.getDatasource(CalciteTests.SOME_DATASOURCE);
+    final DruidTable table = new DatasourceTable(fooDs);
+    final RelDataType rowType = table.getRowType(new JavaTypeFactoryImpl());
+    final List<RelDataTypeField> fields = rowType.getFieldList();
+
+    Assert.assertEquals(9, fields.size());
+
+    Assert.assertEquals("__time", fields.get(0).getName());
+    Assert.assertEquals(SqlTypeName.TIMESTAMP, 
fields.get(0).getType().getSqlTypeName());
+
+    Assert.assertEquals("numbery", fields.get(1).getName());
+    Assert.assertEquals(SqlTypeName.BIGINT, 
fields.get(1).getType().getSqlTypeName());
+
+    Assert.assertEquals("numberyArrays", fields.get(2).getName());
+    Assert.assertEquals(SqlTypeName.ARRAY, 
fields.get(2).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(2).getType().getComponentType().getSqlTypeName());
+
+    Assert.assertEquals("stringy", fields.get(3).getName());
+    Assert.assertEquals(SqlTypeName.VARCHAR, 
fields.get(3).getType().getSqlTypeName());
+
+    Assert.assertEquals("array", fields.get(4).getName());
+    Assert.assertEquals(SqlTypeName.ARRAY, 
fields.get(4).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.BIGINT, 
fields.get(4).getType().getComponentType().getSqlTypeName());
+
+    Assert.assertEquals("nested", fields.get(5).getName());
+    Assert.assertEquals(SqlTypeName.OTHER, 
fields.get(5).getType().getSqlTypeName());
+
+    Assert.assertEquals("cnt", fields.get(6).getName());
+    Assert.assertEquals(SqlTypeName.BIGINT, 
fields.get(6).getType().getSqlTypeName());
+
+    Assert.assertEquals("m1", fields.get(7).getName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(7).getType().getSqlTypeName());
+
+    Assert.assertEquals("unique_dim1", fields.get(8).getName());
+    Assert.assertEquals(SqlTypeName.OTHER, 
fields.get(8).getType().getSqlTypeName());
+  }
+
+  @Test
+  public void testGetTableMapSomeTableLeastRestrictiveTypeMerge() throws 
InterruptedException
+  {
+    // using 'least restrictive' column type merge strategy, the types are 
expected to be the types defined as the
+    // least restrictive blend across all segments
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
+    final DatasourceTable.PhysicalDatasourceMetadata fooDs = 
schema.getDatasource(CalciteTests.SOME_DATASOURCE);
+    final DruidTable table = new DatasourceTable(fooDs);
+    final RelDataType rowType = table.getRowType(new JavaTypeFactoryImpl());
+    final List<RelDataTypeField> fields = rowType.getFieldList();
+
+    Assert.assertEquals(9, fields.size());
+
+    Assert.assertEquals("__time", fields.get(0).getName());
+    Assert.assertEquals(SqlTypeName.TIMESTAMP, 
fields.get(0).getType().getSqlTypeName());
+
+    Assert.assertEquals("numbery", fields.get(1).getName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(1).getType().getSqlTypeName());
+
+    Assert.assertEquals("numberyArrays", fields.get(2).getName());
+    Assert.assertEquals(SqlTypeName.ARRAY, 
fields.get(2).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(2).getType().getComponentType().getSqlTypeName());
+
+    Assert.assertEquals("stringy", fields.get(3).getName());
+    Assert.assertEquals(SqlTypeName.ARRAY, 
fields.get(3).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.VARCHAR, 
fields.get(3).getType().getComponentType().getSqlTypeName());
+
+    Assert.assertEquals("array", fields.get(4).getName());
+    Assert.assertEquals(SqlTypeName.ARRAY, 
fields.get(4).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(4).getType().getComponentType().getSqlTypeName());
+
+    Assert.assertEquals("nested", fields.get(5).getName());
+    Assert.assertEquals(SqlTypeName.OTHER, 
fields.get(5).getType().getSqlTypeName());
+
+    Assert.assertEquals("cnt", fields.get(6).getName());
+    Assert.assertEquals(SqlTypeName.BIGINT, 
fields.get(6).getType().getSqlTypeName());
+
+    Assert.assertEquals("m1", fields.get(7).getName());
+    Assert.assertEquals(SqlTypeName.DOUBLE, 
fields.get(7).getType().getSqlTypeName());
+
+    Assert.assertEquals("unique_dim1", fields.get(8).getName());
+    Assert.assertEquals(SqlTypeName.OTHER, 
fields.get(8).getType().getSqlTypeName());
+  }
+
+
+
   /**
    * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct 
in case
    * of multiple replicas i.e. when {@link 
SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)}
@@ -396,13 +543,13 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testAvailableSegmentMetadataNumRows() throws InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = 
schema.getSegmentMetadataSnapshot();
     final List<DataSegment> segments = segmentsMetadata.values()
                                                        .stream()
                                                        
.map(AvailableSegmentMetadata::getSegment)
                                                        
.collect(Collectors.toList());
-    Assert.assertEquals(4, segments.size());
+    Assert.assertEquals(6, segments.size());
     // find the only segment with datasource "foo2"
     final DataSegment existingSegment = segments.stream()
                                                 .filter(segment -> 
segment.getDataSource().equals("foo2"))
@@ -446,13 +593,13 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testNullDatasource() throws IOException, InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = 
schema.getSegmentMetadataSnapshot();
     final List<DataSegment> segments = segmentMetadatas.values()
                                                        .stream()
                                                        
.map(AvailableSegmentMetadata::getSegment)
                                                        
.collect(Collectors.toList());
-    Assert.assertEquals(4, segments.size());
+    Assert.assertEquals(6, segments.size());
     // segments contains two segments with datasource "foo" and one with 
datasource "foo2"
     // let's remove the only segment with datasource "foo2"
     final DataSegment segmentToRemove = segments.stream()
@@ -465,19 +612,19 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     // The following line can cause NPE without segmentMetadata null check in
     // SegmentMetadataCache#refreshSegmentsForDataSource
     
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
-    Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
+    Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
   }
 
   @Test
   public void testNullAvailableSegmentMetadata() throws IOException, 
InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = 
schema.getSegmentMetadataSnapshot();
     final List<DataSegment> segments = segmentMetadatas.values()
                                                        .stream()
                                                        
.map(AvailableSegmentMetadata::getSegment)
                                                        
.collect(Collectors.toList());
-    Assert.assertEquals(4, segments.size());
+    Assert.assertEquals(6, segments.size());
     // remove one of the segments with datasource "foo"
     final DataSegment segmentToRemove = segments.stream()
                                                 .filter(segment -> 
segment.getDataSource().equals("foo"))
@@ -489,13 +636,13 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     // The following line can cause NPE without segmentMetadata null check in
     // SegmentMetadataCache#refreshSegmentsForDataSource
     
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
-    Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
+    Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
   }
 
   @Test
   public void testAvailableSegmentMetadataIsRealtime() throws 
InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = 
schema.getSegmentMetadataSnapshot();
     final List<DataSegment> segments = segmentsMetadata.values()
                                                        .stream()
@@ -576,7 +723,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.addSegment(newSegment(datasource, 1), ServerType.HISTORICAL);
     Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(5, schema.getTotalSegments());
+    Assert.assertEquals(7, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -621,7 +768,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.addSegment(segment, ServerType.HISTORICAL);
     Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(5, schema.getTotalSegments());
+    Assert.assertEquals(7, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -666,7 +813,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.addSegment(newSegment(datasource, 1), ServerType.REALTIME);
     Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(5, schema.getTotalSegments());
+    Assert.assertEquals(7, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -710,7 +857,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.addSegment(newSegment(datasource, 1), ServerType.BROKER);
     Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(4, schema.getTotalSegments());
+    Assert.assertEquals(6, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -765,7 +912,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.removeSegment(segment, ServerType.REALTIME);
     Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(4, schema.getTotalSegments());
+    Assert.assertEquals(6, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -827,7 +974,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.removeSegment(segments.get(0), ServerType.REALTIME);
     Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(5, schema.getTotalSegments());
+    Assert.assertEquals(7, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -872,7 +1019,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.removeSegment(newSegment(datasource, 1), ServerType.HISTORICAL);
     Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(4, schema.getTotalSegments());
+    Assert.assertEquals(6, schema.getTotalSegments());
   }
 
   @Test
@@ -919,7 +1066,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.removeSegment(segment, ServerType.BROKER);
     Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(5, schema.getTotalSegments());
+    Assert.assertEquals(7, schema.getTotalSegments());
     
Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource));
   }
 
@@ -967,7 +1114,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     serverView.removeSegment(segment, ServerType.HISTORICAL);
     Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS));
 
-    Assert.assertEquals(5, schema.getTotalSegments());
+    Assert.assertEquals(7, schema.getTotalSegments());
     List<AvailableSegmentMetadata> metadatas = schema
         .getSegmentMetadataSnapshot()
         .values()
@@ -995,7 +1142,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws 
InterruptedException
   {
-    SegmentMetadataCache schema3 = buildSchema3();
+    SegmentMetadataCache schema3 = buildSchemaMarkAndRefreshLatch();
     Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
     DatasourceTable.PhysicalDatasourceMetadata fooTable = 
schema3.getDatasource("foo");
     Assert.assertNotNull(fooTable);
@@ -1061,7 +1208,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() 
throws InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema3();
+    SegmentMetadataCache schema = buildSchemaMarkAndRefreshLatch();
     Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
     DatasourceTable.PhysicalDatasourceMetadata fooTable = 
schema.getDatasource("foo");
     Assert.assertNotNull(fooTable);
@@ -1291,7 +1438,7 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
   @Test
   public void testStaleDatasourceRefresh() throws IOException, 
InterruptedException
   {
-    SegmentMetadataCache schema = buildSchema1();
+    SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
     Set<SegmentId> segments = new HashSet<>();
     Set<String> datasources = new HashSet<>();
     datasources.add("wat");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to