This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.20 in repository https://gitbox.apache.org/repos/asf/drill.git
commit f9b88d346a9b99c525e96d5cd7079dd550112f36 Author: Volodymyr Vysotskyi <[email protected]> AuthorDate: Tue Jul 26 04:55:54 2022 +0300 DRILL-8272: Skip MAP column without children when creating parquet tables (#2613) --- .../codegen/templates/AbstractRecordWriter.java | 7 + .../codegen/templates/EventBasedRecordWriter.java | 6 +- .../templates/ParquetOutputRecordWriter.java | 368 +++++++++------------ .../src/main/codegen/templates/RecordWriter.java | 6 + .../exec/store/parquet/ParquetRecordWriter.java | 32 +- .../physical/impl/writer/TestParquetWriter.java | 19 ++ 6 files changed, 220 insertions(+), 218 deletions(-) diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java index 6982c7586e..d26d35aa81 100644 --- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java @@ -24,6 +24,8 @@ import java.lang.UnsupportedOperationException; package org.apache.drill.exec.store; import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.planner.physical.WriterPrel; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.vector.BitVector; import org.apache.drill.exec.vector.BitVector.Accessor; @@ -96,4 +98,9 @@ public abstract class AbstractRecordWriter implements RecordWriter { public void postProcessing() throws IOException { // no op } + + @Override + public boolean supportsField(MaterializedField field) { + return !field.getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD); + } } diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java index a541c63fd6..0d2672e861 100644 --- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import org.apache.drill.exec.planner.physical.WriterPrel; <@pp.dropOutputFile /> <@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" /> @@ -27,7 +26,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.planner.physical.WriterPrel; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.complex.impl.UnionReader; @@ -81,7 +79,7 @@ public class EventBasedRecordWriter { try { int fieldId = 0; for (VectorWrapper w : batch) { - if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) { + if (!recordWriter.supportsField(w.getField())) { continue; } FieldReader reader = w.getValueVector().getReader(); @@ -178,4 +176,4 @@ public class EventBasedRecordWriter { } throw new UnsupportedOperationException(); } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index 1f6e467873..3658ecc82f 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -94,15 +94,157 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp protected abstract PrimitiveType getPrimitiveType(MaterializedField field); + public abstract class BaseFieldConverter extends FieldConverter { + + public BaseFieldConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + public abstract void read(); + + public abstract void read(int i); + + public abstract void consume(); + + @Override + public void writeField() throws IOException { + read(); + consume(); + } + } + + public class NullableFieldConverter extends FieldConverter { + private BaseFieldConverter delegate; + + public NullableFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) { + super(fieldId, fieldName, reader); + this.delegate = delegate; + } + + @Override + public void writeField() throws IOException { + if (!reader.isSet()) { + return; + } + consumer.startField(fieldName, fieldId); + delegate.writeField(); + consumer.endField(fieldName, fieldId); + } + + public void setPosition(int index) { + delegate.setPosition(index); + } + + public void startField() throws IOException { + delegate.startField(); + } + + public void endField() throws IOException { + delegate.endField(); + } + } + + public class RequiredFieldConverter extends FieldConverter { + private BaseFieldConverter delegate; + + public RequiredFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) { + super(fieldId, fieldName, reader); + this.delegate = delegate; + } + + @Override + public void writeField() throws IOException { + consumer.startField(fieldName, fieldId); + delegate.writeField(); + consumer.endField(fieldName, fieldId); + } + + public void setPosition(int index) { + delegate.setPosition(index); + } + + public void startField() throws IOException { + delegate.startField(); + } + + public void endField() throws IOException { + delegate.endField(); + } + } + + public class RepeatedFieldConverter extends FieldConverter { + + private BaseFieldConverter delegate; + + public RepeatedFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) { + super(fieldId, fieldName, reader); + this.delegate = delegate; + } + + @Override + public void writeField() throws IOException { + // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements + if (reader.size() == 0) { + return; + } + consumer.startField(fieldName, fieldId); + for (int i = 0; i < reader.size(); i++) { + delegate.read(i); + delegate.consume(); + } + consumer.endField(fieldName, fieldId); + } + + @Override + public void writeListField() { + if (reader.size() == 0) { + return; + } + consumer.startField(LIST, ZERO_IDX); + for (int i = 0; i < reader.size(); i++) { + consumer.startGroup(); + consumer.startField(ELEMENT, ZERO_IDX); + + delegate.read(i); + delegate.consume(); + + consumer.endField(ELEMENT, ZERO_IDX); + consumer.endGroup(); + } + consumer.endField(LIST, ZERO_IDX); + } + + public void setPosition(int index) { + delegate.setPosition(index); + } + + public void startField() throws IOException { + delegate.startField(); + } + + public void endField() throws IOException { + delegate.endField(); + } + } + <#list vv.types as type> <#list type.minor as minor> <#list vv.modes as mode> @Override public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) { - return new ${mode.prefix}${minor.class}ParquetConverter(fieldId, fieldName, reader); + BaseFieldConverter converter = new ${minor.class}ParquetConverter(fieldId, fieldName, reader); + <#if mode.prefix == "Nullable"> + return new NullableFieldConverter(fieldId, fieldName, reader, converter); + <#elseif mode.prefix == "Repeated"> + return new RepeatedFieldConverter(fieldId, fieldName, reader, converter); + <#else> + return new RequiredFieldConverter(fieldId, fieldName, reader, converter); + </#if> } - public class ${mode.prefix}${minor.class}ParquetConverter extends FieldConverter { + </#list> + + public class ${minor.class}ParquetConverter extends BaseFieldConverter { private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder(); <#if minor.class?contains("Interval")> private final byte[] output = new byte[12]; @@ -110,7 +252,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp private final DecimalValueWriter decimalValueWriter; </#if> - public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) { + public ${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) { super(fieldId, fieldName, reader); <#if minor.class == "VarDecimal"> decimalValueWriter = DecimalValueWriter. @@ -119,20 +261,17 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp } @Override - public void writeField() throws IOException { - <#if mode.prefix == "Nullable" > - if (!reader.isSet()) { - return; - } - <#elseif mode.prefix == "Repeated" > - // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements - if (reader.size() == 0) { - return; + public void read() { + reader.read(holder); + } + + @Override + public void read(int i) { + reader.read(i, holder); } - consumer.startField(fieldName, fieldId); - for (int i = 0; i < reader.size(); i++) { - </#if> + @Override + public void consume() { <#if minor.class == "TinyInt" || minor.class == "UInt1" || minor.class == "UInt2" || @@ -141,80 +280,28 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp minor.class == "Time" || minor.class == "Decimal9" || minor.class == "UInt4"> - <#if mode.prefix == "Repeated" > - reader.read(i, holder); - consumer.addInteger(holder.value); - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); - consumer.addInteger(holder.value); - consumer.endField(fieldName, fieldId); - </#if> + consumer.addInteger(holder.value); <#elseif minor.class == "Float4"> - <#if mode.prefix == "Repeated" > - reader.read(i, holder); - consumer.addFloat(holder.value); - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); - consumer.addFloat(holder.value); - consumer.endField(fieldName, fieldId); - </#if> + consumer.addFloat(holder.value); <#elseif minor.class == "BigInt" || minor.class == "Decimal18" || minor.class == "TimeStamp" || minor.class == "UInt8"> - <#if mode.prefix == "Repeated" > - reader.read(i, holder); - consumer.addLong(holder.value); - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); - consumer.addLong(holder.value); - consumer.endField(fieldName, fieldId); - </#if> + consumer.addLong(holder.value); <#elseif minor.class == "Date"> - <#if mode.prefix == "Repeated" > - reader.read(i, holder); - consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY)); - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); // convert from internal Drill date format to Julian Day centered around Unix Epoc consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY)); - consumer.endField(fieldName, fieldId); - </#if> <#elseif minor.class == "Float8"> - <#if mode.prefix == "Repeated" > - reader.read(i, holder); - consumer.addDouble(holder.value); - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); - consumer.addDouble(holder.value); - consumer.endField(fieldName, fieldId); - </#if> + consumer.addDouble(holder.value); <#elseif minor.class == "Bit"> - <#if mode.prefix == "Repeated" > - reader.read(i, holder); - consumer.addBoolean(holder.value == 1); - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); - consumer.addBoolean(holder.value == 1); - consumer.endField(fieldName, fieldId); - </#if> + consumer.addBoolean(holder.value == 1); <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse"> - <#if mode.prefix == "Repeated" > - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); byte[] bytes = DecimalUtility.getBigDecimalFromSparse( holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray(); byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})]; @@ -225,11 +312,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp } System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length); consumer.addBinary(Binary.fromByteArray(output)); - consumer.endField(fieldName, fieldId); - </#if> <#elseif minor.class?contains("Interval")> - consumer.startField(fieldName, fieldId); - reader.read(holder); <#if minor.class == "IntervalDay"> Arrays.fill(output, 0, 4, (byte) 0); IntervalUtility.intToLEByteArray(holder.days, output, 4); @@ -244,143 +327,16 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8); </#if> consumer.addBinary(Binary.fromByteArray(output)); - consumer.endField(fieldName, fieldId); - - <#elseif - minor.class == "TimeTZ" || - minor.class == "Decimal28Dense" || - minor.class == "Decimal38Dense"> - <#if mode.prefix == "Repeated" > - <#else> - - </#if> - <#elseif minor.class == "VarChar" || minor.class == "Var16Char" - || minor.class == "VarBinary" || minor.class == "VarDecimal"> - <#if mode.prefix == "Repeated"> - reader.read(i, holder); - <#if minor.class == "VarDecimal"> - decimalValueWriter.writeValue(consumer, holder.buffer, - holder.start, holder.end, reader.getField().getPrecision()); - <#else> - consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); - </#if> - <#else> - reader.read(holder); - consumer.startField(fieldName, fieldId); - <#if minor.class == "VarDecimal"> + <#elseif minor.class == "VarDecimal"> decimalValueWriter.writeValue(consumer, holder.buffer, holder.start, holder.end, reader.getField().getPrecision()); - <#else> + <#elseif minor.class == "VarChar" || minor.class == "Var16Char" + || minor.class == "VarBinary"> consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); - </#if> - consumer.endField(fieldName, fieldId); - </#if> </#if> - <#if mode.prefix == "Repeated"> } - consumer.endField(fieldName, fieldId); - </#if> - } - - <#if mode.prefix == "Repeated"> - @Override - public void writeListField() { - if (reader.size() == 0) { - return; - } - consumer.startField(LIST, ZERO_IDX); - for (int i = 0; i < reader.size(); i++) { - consumer.startGroup(); - consumer.startField(ELEMENT, ZERO_IDX); - - <#if minor.class == "TinyInt" || - minor.class == "UInt1" || - minor.class == "UInt2" || - minor.class == "SmallInt" || - minor.class == "Int" || - minor.class == "Time" || - minor.class == "Decimal9" || - minor.class == "UInt4"> - reader.read(i, holder); - consumer.addInteger(holder.value); - <#elseif minor.class == "Float4"> - reader.read(i, holder); - consumer.addFloat(holder.value); - <#elseif minor.class == "BigInt" || - minor.class == "Decimal18" || - minor.class == "TimeStamp" || - minor.class == "UInt8"> - reader.read(i, holder); - consumer.addLong(holder.value); - <#elseif minor.class == "Date"> - reader.read(i, holder); - consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY)); - <#elseif minor.class == "Float8"> - reader.read(i, holder); - consumer.addDouble(holder.value); - <#elseif minor.class == "Bit"> - reader.read(i, holder); - consumer.addBoolean(holder.value == 1); - <#elseif minor.class == "Decimal28Sparse" || - minor.class == "Decimal38Sparse"> - <#if mode.prefix == "Repeated" > - <#else> - consumer.startField(fieldName, fieldId); - reader.read(holder); - byte[] bytes = DecimalUtility.getBigDecimalFromSparse( - holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray(); - byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})]; - if (holder.getSign(holder.start, holder.buffer)) { - Arrays.fill(output, 0, output.length - bytes.length, (byte) -1); - } else { - Arrays.fill(output, 0, output.length - bytes.length, (byte) 0); - } - System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length); - consumer.addBinary(Binary.fromByteArray(output)); - consumer.endField(fieldName, fieldId); - </#if> - <#elseif minor.class?contains("Interval")> - consumer.startField(fieldName, fieldId); - reader.read(holder); - <#if minor.class == "IntervalDay"> - Arrays.fill(output, 0, 4, (byte) 0); - IntervalUtility.intToLEByteArray(holder.days, output, 4); - IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8); - <#elseif minor.class == "IntervalYear"> - IntervalUtility.intToLEByteArray(holder.value, output, 0); - Arrays.fill(output, 4, 8, (byte) 0); - Arrays.fill(output, 8, 12, (byte) 0); - <#elseif minor.class == "Interval"> - IntervalUtility.intToLEByteArray(holder.months, output, 0); - IntervalUtility.intToLEByteArray(holder.days, output, 4); - IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8); - </#if> - consumer.addBinary(Binary.fromByteArray(output)); - consumer.endField(fieldName, fieldId); - - <#elseif - minor.class == "TimeTZ" || - minor.class == "Decimal28Dense" || - minor.class == "Decimal38Dense"> - <#elseif minor.class == "VarChar" || minor.class == "Var16Char" - || minor.class == "VarBinary" || minor.class == "VarDecimal"> - reader.read(i, holder); - <#if minor.class == "VarDecimal"> - decimalValueWriter.writeValue(consumer, holder.buffer, holder.start, holder.end, - reader.getField().getPrecision()); - <#else> - consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); - </#if> - </#if> - consumer.endField(ELEMENT, ZERO_IDX); - consumer.endGroup(); - } - consumer.endField(LIST, ZERO_IDX); - } - </#if> } - </#list> </#list> </#list> diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java index d11c2a14ab..444772b93a 100644 --- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java @@ -23,6 +23,7 @@ package org.apache.drill.exec.store; import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.vector.complex.reader.FieldReader; @@ -94,4 +95,9 @@ public interface RecordWriter { void postProcessing() throws IOException; void abort() throws IOException; void cleanup() throws IOException; + + /** + * Checks whether this writer supports writing of the given field. + */ + boolean supportsField(MaterializedField field); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index a4b68da561..bcea784051 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; @@ -249,10 +250,10 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { } } - private void newSchema() throws IOException { + private void newSchema() { List<Type> types = new ArrayList<>(); for (MaterializedField field : batchSchema) { - if (field.getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) { + if (!supportsField(field)) { continue; } types.add(getType(field)); @@ -297,6 +298,13 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { setUp(schema, consumer); } + @Override + public boolean supportsField(MaterializedField field) { + return super.supportsField(field) + && (field.getType().getMinorType() != MinorType.MAP || field.getChildCount() > 0); + } + + @Override protected PrimitiveType getPrimitiveType(MaterializedField field) { MinorType minorType = field.getType().getMinorType(); String name = field.getName(); @@ -513,13 +521,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void writeField() throws IOException { - consumer.startField(fieldName, fieldId); - consumer.startGroup(); - for (FieldConverter converter : converters) { - converter.writeField(); + if (!converters.isEmpty()) { + consumer.startField(fieldName, fieldId); + consumer.startGroup(); + for (FieldConverter converter : converters) { + converter.writeField(); + } + consumer.endGroup(); + consumer.endField(fieldName, fieldId); } - consumer.endGroup(); - consumer.endField(fieldName, fieldId); } } @@ -683,11 +693,17 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void startRecord() throws IOException { + if (CollectionUtils.isEmpty(schema.getFields())) { + return; + } consumer.startMessage(); } @Override public void endRecord() throws IOException { + if (CollectionUtils.isEmpty(schema.getFields())) { + return; + } consumer.endMessage(); // we wait until there is at least one record before creating the parquet file diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 28b009e869..707a552819 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.writer; import org.apache.calcite.util.Pair; +import org.apache.commons.io.FileUtils; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.categories.UnlikelyTest; @@ -57,6 +58,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.math.BigDecimal; +import java.nio.charset.Charset; import java.nio.file.Paths; import java.time.LocalDate; import java.util.ArrayList; @@ -1513,6 +1515,23 @@ public class TestParquetWriter extends ClusterTest { } } + @Test + public void testResultWithEmptyMap() throws Exception { + String fileName = "emptyMap.json"; + + FileUtils.writeStringToFile(new File(dirTestWatcher.getRootDir(), fileName), + "{\"sample\": {}, \"a\": \"a\"}", Charset.defaultCharset()); + + run("create table dfs.tmp.t1 as SELECT * from dfs.`%s` t", fileName); + + testBuilder() + .sqlQuery("select * from dfs.tmp.t1") + .unOrdered() + .baselineColumns("a") + .baselineValues("a") + .go(); + } + /** * Checks that specified parquet table contains specified columns with specified types. *
