clintropolis commented on code in PR #13803:
URL: https://github.com/apache/druid/pull/13803#discussion_r1148812179
##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -86,99 +94,129 @@ public NestedDataColumnMerger(
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters)
throws IOException
{
+ try {
+ long dimStartTime = System.currentTimeMillis();
+
+ int numMergeIndex = 0;
+ GlobalDictionarySortedCollector sortedLookup = null;
+ final Indexed[] sortedLookups = new Indexed[adapters.size()];
+ final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+ final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+ final Iterable<Object[]>[] sortedArrayLookups = new
Iterable[adapters.size()];
+
+ final SortedMap<String, NestedFieldTypeInfo.MutableTypeSet> mergedFields
= new TreeMap<>();
+
+ for (int i = 0; i < adapters.size(); i++) {
+ final IndexableAdapter adapter = adapters.get(i);
+ final GlobalDictionarySortedCollector dimValues;
+ if (adapter instanceof IncrementalIndexAdapter) {
+ dimValues =
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter,
mergedFields);
+ } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+ dimValues =
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter,
mergedFields);
+ } else {
+ throw new ISE("Unable to merge columns of unsupported adapter [%s]",
adapter.getClass());
+ }
- long dimStartTime = System.currentTimeMillis();
-
- int numMergeIndex = 0;
- GlobalDictionarySortedCollector sortedLookup = null;
- final Indexed[] sortedLookups = new Indexed[adapters.size()];
- final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
- final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+ boolean allNulls = dimValues == null || dimValues.allNull();
+ sortedLookup = dimValues;
+ if (!allNulls) {
+ sortedLookups[i] = dimValues.getSortedStrings();
+ sortedLongLookups[i] = dimValues.getSortedLongs();
+ sortedDoubleLookups[i] = dimValues.getSortedDoubles();
+ sortedArrayLookups[i] = dimValues.getSortedArrays();
+ numMergeIndex++;
+ }
+ }
- final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields
= new TreeMap<>();
+ descriptorBuilder = new ColumnDescriptor.Builder();
- for (int i = 0; i < adapters.size(); i++) {
- final IndexableAdapter adapter = adapters.get(i);
- final GlobalDictionarySortedCollector dimValues;
- if (adapter instanceof IncrementalIndexAdapter) {
- dimValues =
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter,
mergedFields);
- } else if (adapter instanceof QueryableIndexIndexableAdapter) {
- dimValues =
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter,
mergedFields);
+ final NestedDataColumnSerializer defaultSerializer = new
NestedDataColumnSerializer(
+ name,
+ indexSpec,
+ segmentWriteOutMedium,
+ progressIndicator,
+ closer
+ );
+ serializer = defaultSerializer;
+
+ final ComplexColumnPartSerde partSerde =
ComplexColumnPartSerde.serializerBuilder()
+
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
+
.withDelegate(serializer)
+ .build();
+ descriptorBuilder.setValueType(ValueType.COMPLEX)
+ .setHasMultipleValues(false)
+ .addSerde(partSerde);
+
+ defaultSerializer.open();
+ defaultSerializer.serializeFields(mergedFields);
+
+ int stringCardinality;
+ int longCardinality;
+ int doubleCardinality;
+ int arrayCardinality;
+ if (numMergeIndex == 1) {
+ defaultSerializer.serializeDictionaries(
+ sortedLookup.getSortedStrings(),
+ sortedLookup.getSortedLongs(),
+ sortedLookup.getSortedDoubles(),
+ () -> new ArrayDictionaryMergingIterator(
+ sortedArrayLookups,
+ defaultSerializer.getGlobalLookup()
+ )
+ );
+ stringCardinality = sortedLookup.getStringCardinality();
+ longCardinality = sortedLookup.getLongCardinality();
+ doubleCardinality = sortedLookup.getDoubleCardinality();
+ arrayCardinality = sortedLookup.getArrayCardinality();
} else {
- throw new ISE("Unable to merge columns of unsupported adapter %s",
adapter.getClass());
+ final SimpleDictionaryMergingIterator<String> stringIterator = new
SimpleDictionaryMergingIterator<>(
+ sortedLookups,
+ STRING_MERGING_COMPARATOR
+ );
+ final SimpleDictionaryMergingIterator<Long> longIterator = new
SimpleDictionaryMergingIterator<>(
+ sortedLongLookups,
+ LONG_MERGING_COMPARATOR
+ );
+ final SimpleDictionaryMergingIterator<Double> doubleIterator = new
SimpleDictionaryMergingIterator<>(
+ sortedDoubleLookups,
+ DOUBLE_MERGING_COMPARATOR
+ );
+ final ArrayDictionaryMergingIterator arrayIterator = new
ArrayDictionaryMergingIterator(
+ sortedArrayLookups,
+ defaultSerializer.getGlobalLookup()
+ );
+ defaultSerializer.serializeDictionaries(
+ () -> stringIterator,
+ () -> longIterator,
+ () -> doubleIterator,
+ () -> arrayIterator
+ );
+ stringCardinality = stringIterator.getCardinality();
+ longCardinality = longIterator.getCardinality();
+ doubleCardinality = doubleIterator.getCardinality();
+ arrayCardinality = arrayIterator.getCardinality();
}
- boolean allNulls = allNull(dimValues.getSortedStrings()) &&
- allNull(dimValues.getSortedLongs()) &&
- allNull(dimValues.getSortedDoubles());
- sortedLookup = dimValues;
- if (!allNulls) {
- sortedLookups[i] = dimValues.getSortedStrings();
- sortedLongLookups[i] = dimValues.getSortedLongs();
- sortedDoubleLookups[i] = dimValues.getSortedDoubles();
- numMergeIndex++;
- }
- }
-
- int cardinality = 0;
- descriptorBuilder = new ColumnDescriptor.Builder();
-
- final NestedDataColumnSerializer defaultSerializer = new
NestedDataColumnSerializer(
- name,
- indexSpec,
- segmentWriteOutMedium,
- progressIndicator,
- closer
- );
- serializer = defaultSerializer;
-
- final ComplexColumnPartSerde partSerde =
ComplexColumnPartSerde.serializerBuilder()
-
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
-
.withDelegate(serializer)
- .build();
- descriptorBuilder.setValueType(ValueType.COMPLEX)
- .setHasMultipleValues(false)
- .addSerde(partSerde);
-
- defaultSerializer.open();
- defaultSerializer.serializeFields(mergedFields);
-
- if (numMergeIndex > 1) {
- SimpleDictionaryMergingIterator<String> dictionaryMergeIterator = new
SimpleDictionaryMergingIterator<>(
- sortedLookups,
- STRING_MERGING_COMPARATOR
+ log.debug(
+ "Completed dim[%s] conversions with string cardinality[%,d], long
cardinality[%,d], double cardinality[%,d], array cardinality[%,d] in %,d
millis.",
+ name,
+ stringCardinality,
+ longCardinality,
+ doubleCardinality,
+ arrayCardinality,
+ System.currentTimeMillis() - dimStartTime
);
- SimpleDictionaryMergingIterator<Long> longDictionaryMergeIterator = new
SimpleDictionaryMergingIterator<>(
- sortedLongLookups,
- LONG_MERGING_COMPARATOR
- );
- SimpleDictionaryMergingIterator<Double> doubleDictionaryMergeIterator =
new SimpleDictionaryMergingIterator<>(
- sortedDoubleLookups,
- DOUBLE_MERGING_COMPARATOR
- );
- defaultSerializer.serializeStringDictionary(() ->
dictionaryMergeIterator);
- defaultSerializer.serializeLongDictionary(() ->
longDictionaryMergeIterator);
- defaultSerializer.serializeDoubleDictionary(() ->
doubleDictionaryMergeIterator);
- cardinality = dictionaryMergeIterator.getCardinality();
- } else if (numMergeIndex == 1) {
-
defaultSerializer.serializeStringDictionary(sortedLookup.getSortedStrings());
- defaultSerializer.serializeLongDictionary(sortedLookup.getSortedLongs());
-
defaultSerializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles());
- cardinality = sortedLookup.size();
}
-
- log.debug(
- "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
- name,
- cardinality,
- System.currentTimeMillis() - dimStartTime
- );
+ catch (Throwable ioe) {
Review Comment:
hmm, I don't remember why I added this, I think I was debugging something
and needed an easy to place to catch to tell me what was failing for which
column, and catching here seemed the best way to tell me what was messed up,
but it could probably narrow it down to `IOException` and just rethrow it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]