rdblue commented on code in PR #3202: URL: https://github.com/apache/parquet-java/pull/3202#discussion_r2070936092
########## parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java: ########## @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; + +/** + * Builder for creating Variant value and metadata. + */ +public class VariantBuilder { + /** The buffer for building the Variant value. The first `writePos` bytes have been written. */ + protected byte[] writeBuffer = new byte[1024]; + + protected int writePos = 0; + /** The dictionary for mapping keys to monotonically increasing ids. */ + private final HashMap<String, Integer> dictionary = new HashMap<>(); + /** The keys in the dictionary, in id order. */ + private final ArrayList<byte[]> dictionaryKeys = new ArrayList<>(); + + /** The number of values appended to this builder. */ + protected long numValues = 0; + + /** + * These are used to build nested objects and arrays, via startObject() and startArray(). + * Only one of these can be non-null at a time. If one of these is non-null, then no append() + * methods can be called on this builder, until endObject() or endArray() is called. + */ + protected VariantObjectBuilder objectBuilder = null; + + protected VariantArrayBuilder arrayBuilder = null; + + /** + * Creates a VariantBuilder. + */ + public VariantBuilder() {} + + /** + * @return the Variant value + */ + public Variant build() { + if (objectBuilder != null) { + throw new IllegalStateException( + "Cannot call build() while an object is being built. Must call endObject() first."); + } + if (arrayBuilder != null) { + throw new IllegalStateException( + "Cannot call build() while an array is being built. Must call endArray() first."); + } + int numKeys = dictionaryKeys.size(); + // Use long to avoid overflow in accumulating lengths. + long dictionaryTotalDataSize = 0; + for (byte[] key : dictionaryKeys) { + dictionaryTotalDataSize += key.length; + } + // Determine the number of bytes required per offset entry. + // The largest offset is the one-past-the-end value, which is total data size. It's very + // unlikely that the number of keys could be larger, but incorporate that into the calculation + // in case of pathological data. + long maxSize = Math.max(dictionaryTotalDataSize, numKeys); + int offsetSize = getMinIntegerSize((int) maxSize); + + int offsetListOffset = 1 + offsetSize; + int dataOffset = offsetListOffset + (numKeys + 1) * offsetSize; + long metadataSize = dataOffset + dictionaryTotalDataSize; + + byte[] metadata = new byte[(int) metadataSize]; + // Only unsorted dictionary keys are supported. + // TODO: Support sorted dictionary keys. + int headerByte = VariantUtil.VERSION | ((offsetSize - 1) << 6); + VariantUtil.writeLong(metadata, 0, headerByte, 1); + VariantUtil.writeLong(metadata, 1, numKeys, offsetSize); + int currentOffset = 0; + for (int i = 0; i < numKeys; ++i) { + VariantUtil.writeLong(metadata, offsetListOffset + i * offsetSize, currentOffset, offsetSize); + byte[] key = dictionaryKeys.get(i); + System.arraycopy(key, 0, metadata, dataOffset + currentOffset, key.length); + currentOffset += key.length; + } + VariantUtil.writeLong(metadata, offsetListOffset + numKeys * offsetSize, currentOffset, offsetSize); + // Copying the data to a new buffer, to retain only the required data length, not the capacity. + return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); + } + + /** + * Appends a string value to the Variant builder. + * @param str the string value to append + */ + public void appendString(String str) { + onAppend(); + byte[] data = str.getBytes(StandardCharsets.UTF_8); + boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE; + checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length); + if (longStr) { + writeBuffer[writePos] = VariantUtil.HEADER_LONG_STRING; + writePos += 1; + VariantUtil.writeLong(writeBuffer, writePos, data.length, VariantUtil.U32_SIZE); + writePos += VariantUtil.U32_SIZE; + } else { + writeBuffer[writePos] = VariantUtil.shortStrHeader(data.length); + writePos += 1; + } + System.arraycopy(data, 0, writeBuffer, writePos, data.length); + writePos += data.length; + numValues++; + } + + /** + * Appends a null value to the Variant builder. + */ + public void appendNull() { + onAppend(); + checkCapacity(1); + writeBuffer[writePos] = VariantUtil.HEADER_NULL; + writePos += 1; + numValues++; + } + + /** + * Appends a boolean value to the Variant builder. + * @param b the boolean value to append + */ + public void appendBoolean(boolean b) { + onAppend(); + checkCapacity(1); + writeBuffer[writePos] = b ? VariantUtil.HEADER_TRUE : VariantUtil.HEADER_FALSE; + writePos += 1; + numValues++; + } + + /** + * Appends a long value to the variant builder. + * @param l the long value to append + */ + public void appendLong(long l) { + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_INT64; + VariantUtil.writeLong(writeBuffer, writePos + 1, l, 8); + writePos += 9; + numValues++; + } + + /** + * Appends an int value to the variant builder. + * @param i the int to append + */ + public void appendInt(int i) { + onAppend(); + checkCapacity(1 /* header size */ + 4); + writeBuffer[writePos] = VariantUtil.HEADER_INT32; + VariantUtil.writeLong(writeBuffer, writePos + 1, i, 4); + writePos += 5; + numValues++; + } + + /** + * Appends a short value to the variant builder. + * @param s the short to append + */ + public void appendShort(short s) { + onAppend(); + checkCapacity(1 /* header size */ + 2); + writeBuffer[writePos] = VariantUtil.HEADER_INT16; + VariantUtil.writeLong(writeBuffer, writePos + 1, s, 2); + writePos += 3; + numValues++; + } + + /** + * Appends a byte value to the variant builder. + * @param b the byte to append + */ + public void appendByte(byte b) { + onAppend(); + checkCapacity(1 /* header size */ + 1); + writeBuffer[writePos] = VariantUtil.HEADER_INT8; + VariantUtil.writeLong(writeBuffer, writePos + 1, b, 1); + writePos += 2; + numValues++; + } + + /** + * Appends a double value to the variant builder. + * @param d the double to append + */ + public void appendDouble(double d) { + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_DOUBLE; + VariantUtil.writeLong(writeBuffer, writePos + 1, Double.doubleToLongBits(d), 8); + writePos += 9; + numValues++; + } + + /** + * Appends a decimal value to the variant builder. The actual encoded decimal type depends on the + * precision and scale of the decimal value. + * @param d the decimal value to append + */ + public void appendDecimal(BigDecimal d) { + onAppend(); + BigInteger unscaled = d.unscaledValue(); + if (d.scale() <= VariantUtil.MAX_DECIMAL4_PRECISION && d.precision() <= VariantUtil.MAX_DECIMAL4_PRECISION) { + checkCapacity(2 /* header and scale size */ + 4); + writeBuffer[writePos] = VariantUtil.HEADER_DECIMAL4; + writeBuffer[writePos + 1] = (byte) d.scale(); + VariantUtil.writeLong(writeBuffer, writePos + 2, unscaled.intValueExact(), 4); + writePos += 6; + } else if (d.scale() <= VariantUtil.MAX_DECIMAL8_PRECISION + && d.precision() <= VariantUtil.MAX_DECIMAL8_PRECISION) { + checkCapacity(2 /* header and scale size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_DECIMAL8; + writeBuffer[writePos + 1] = (byte) d.scale(); + VariantUtil.writeLong(writeBuffer, writePos + 2, unscaled.longValueExact(), 8); + writePos += 10; + } else { + assert d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION + && d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION; + checkCapacity(2 /* header and scale size */ + 16); + writeBuffer[writePos] = VariantUtil.HEADER_DECIMAL16; + writeBuffer[writePos + 1] = (byte) d.scale(); + writePos += 2; + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { + writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { + writeBuffer[writePos + i] = sign; + } + writePos += 16; + } + numValues++; + } + + /** + * Appends a date value to the variant builder. The date is represented as the number of days + * since the epoch. + * @param daysSinceEpoch the number of days since the epoch + */ + public void appendDate(int daysSinceEpoch) { + onAppend(); + checkCapacity(1 /* header size */ + 4); + writeBuffer[writePos] = VariantUtil.HEADER_DATE; + VariantUtil.writeLong(writeBuffer, writePos + 1, daysSinceEpoch, 4); + writePos += 5; + numValues++; + } + + /** + * Appends a TimestampTz value to the variant builder. The timestamp is represented as the number + * of microseconds since the epoch. + * @param microsSinceEpoch the number of microseconds since the epoch + */ + public void appendTimestampTz(long microsSinceEpoch) { + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_TIMESTAMP_TZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, microsSinceEpoch, 8); + writePos += 9; + numValues++; + } + + /** + * Appends a TimestampNtz value to the variant builder. The timestamp is represented as the number + * of microseconds since the epoch. + * @param microsSinceEpoch the number of microseconds since the epoch + */ + public void appendTimestampNtz(long microsSinceEpoch) { + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_TIMESTAMP_NTZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, microsSinceEpoch, 8); + writePos += 9; + numValues++; + } + + /** + * Appends a Time value to the variant builder. The time is represented as the number of + * microseconds since midnight. + * @param microsSinceMidnight the number of microseconds since midnight + */ + public void appendTime(long microsSinceMidnight) { + if (microsSinceMidnight < 0) { + throw new IllegalArgumentException( + String.format("Time value (%d) cannot be negative.", microsSinceMidnight)); + } + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_TIME; + VariantUtil.writeLong(writeBuffer, writePos + 1, microsSinceMidnight, 8); + writePos += 9; + numValues++; + } + + /** + * Appends a TimestampNanosTz value to the variant builder. The timestamp is represented as the + * number of nanoseconds since the epoch. + * @param nanosSinceEpoch the number of nanoseconds since the epoch + */ + public void appendTimestampNanosTz(long nanosSinceEpoch) { + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_TIMESTAMP_NANOS_TZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, nanosSinceEpoch, 8); + writePos += 9; + numValues++; + } + + /** + * Appends a TimestampNanosNtz value to the variant builder. The timestamp is represented as the + * number of nanoseconds since the epoch. + * @param nanosSinceEpoch the number of nanoseconds since the epoch + */ + public void appendTimestampNanosNtz(long nanosSinceEpoch) { + onAppend(); + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HEADER_TIMESTAMP_NANOS_NTZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, nanosSinceEpoch, 8); + writePos += 9; + numValues++; + } + + /** + * Appends a float value to the variant builder. + * @param f the float to append + */ + public void appendFloat(float f) { + onAppend(); + checkCapacity(1 /* header size */ + 4); + writeBuffer[writePos] = VariantUtil.HEADER_FLOAT; + VariantUtil.writeLong(writeBuffer, writePos + 1, Float.floatToIntBits(f), 8); + writePos += 5; + numValues++; + } + + /** + * Appends binary data to the variant builder. + * @param binary the binary data to append + */ + public void appendBinary(ByteBuffer binary) { + onAppend(); + int binarySize = binary.remaining(); + checkCapacity(1 /* header size */ + VariantUtil.U32_SIZE + binarySize); + writeBuffer[writePos] = VariantUtil.HEADER_BINARY; + writePos += 1; + VariantUtil.writeLong(writeBuffer, writePos, binarySize, VariantUtil.U32_SIZE); + writePos += VariantUtil.U32_SIZE; + ByteBuffer.wrap(writeBuffer, writePos, binarySize).put(binary); + writePos += binarySize; + numValues++; + } + + /** + * Appends a UUID value to the variant builder. + * @param uuid the UUID to append + */ + public void appendUUID(java.util.UUID uuid) { + onAppend(); + checkCapacity(1 /* header size */ + VariantUtil.UUID_SIZE); + writeBuffer[writePos] = VariantUtil.HEADER_UUID; + writePos += 1; + + ByteBuffer bb = + ByteBuffer.wrap(writeBuffer, writePos, VariantUtil.UUID_SIZE).order(ByteOrder.BIG_ENDIAN); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + writePos += VariantUtil.UUID_SIZE; + numValues++; + } + + /** + * Starts appending an object to this variant builder. The returned VariantObjectBuilder is used + * to append object keys and values. startObject() must be called before endObject(). + * No append*() methods can be called in between startObject() and endObject(). + * + * Example usage: + * VariantBuilder builder = new VariantBuilder(); + * VariantObjectBuilder objBuilder = builder.startObject(); + * objBuilder.appendKey("key1"); + * objBuilder.appendString("value1"); + * builder.endObject(); + * + * @return a VariantObjectBuilder to build an object + */ + public VariantObjectBuilder startObject() { + onStartNested(); + if (objectBuilder != null) { + throw new IllegalStateException("Cannot call startObject() without calling endObject() first."); + } + if (arrayBuilder != null) { + throw new IllegalStateException("Cannot call startObject() without calling endArray() first."); + } + objectBuilder = new VariantObjectBuilder(this); + return objectBuilder; + } + + /** + * Finishes appending the object to this builder. This method must be called after startObject(), + * before other append*() methods can be called on this builder. + */ + protected void endObject() { + if (objectBuilder == null) { + throw new IllegalStateException("Cannot call endObject() without calling startObject() first."); + } + ArrayList<FieldEntry> fields = objectBuilder.validateAndGetFields(); + int numFields = fields.size(); + Collections.sort(fields); + int maxId = numFields == 0 ? 0 : fields.get(0).id; + int dataSize = numFields == 0 ? 0 : fields.get(0).valueSize; + + int distinctPos = 0; + // Maintain a list of distinct keys in-place. + for (int i = 1; i < numFields; ++i) { + maxId = Math.max(maxId, fields.get(i).id); + if (fields.get(i).id == fields.get(i - 1).id) { + // Found a duplicate key. Keep the field with the greater offset. Review Comment: I think this comment is missing "because it was written last" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
