rdblue commented on code in PR #3202: URL: https://github.com/apache/parquet-java/pull/3202#discussion_r2069391167
########## parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java: ########## @@ -0,0 +1,621 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; + +/** + * Builder for creating Variant value and metadata. + */ +public class VariantBuilder { + /** + * Creates a VariantBuilder. + * @param allowDuplicateObjectKeys if true, only the last occurrence of a duplicate object key + * will be kept. Otherwise, an exception will be thrown. + */ + public VariantBuilder(boolean allowDuplicateObjectKeys) { + this.allowDuplicateObjectKeys = allowDuplicateObjectKeys; + } + + /** + * @return the Variant value + */ + public Variant build() { + int numKeys = dictionaryKeys.size(); + // Use long to avoid overflow in accumulating lengths. + long dictionaryTotalDataSize = 0; + for (byte[] key : dictionaryKeys) { + dictionaryTotalDataSize += key.length; + } + // Determine the number of bytes required per offset entry. + // The largest offset is the one-past-the-end value, which is total data size. It's very + // unlikely that the number of keys could be larger, but incorporate that into the calculation + // in case of pathological data. + long maxSize = Math.max(dictionaryTotalDataSize, numKeys); + int offsetSize = getMinIntegerSize((int) maxSize); + + int offsetListOffset = 1 + offsetSize; + int dataOffset = offsetListOffset + (numKeys + 1) * offsetSize; + long metadataSize = dataOffset + dictionaryTotalDataSize; + + byte[] metadata = new byte[(int) metadataSize]; + // Only unsorted dictionary keys are supported. + // TODO: Support sorted dictionary keys. + int headerByte = VariantUtil.VERSION | ((offsetSize - 1) << 6); + VariantUtil.writeLong(metadata, 0, headerByte, 1); + VariantUtil.writeLong(metadata, 1, numKeys, offsetSize); + int currentOffset = 0; + for (int i = 0; i < numKeys; ++i) { + VariantUtil.writeLong(metadata, offsetListOffset + i * offsetSize, currentOffset, offsetSize); + byte[] key = dictionaryKeys.get(i); + System.arraycopy(key, 0, metadata, dataOffset + currentOffset, key.length); + currentOffset += key.length; + } + VariantUtil.writeLong(metadata, offsetListOffset + numKeys * offsetSize, currentOffset, offsetSize); + // Copying the data to a new buffer, to retain only the required data length, not the capacity. + return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); + } + + /** + * Appends a string value to the Variant builder. + * @param str the string value to append + * @return this builder + */ + public VariantBuilder appendString(String str) { + byte[] data = str.getBytes(StandardCharsets.UTF_8); + boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE; + checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length); + if (longStr) { + writeBuffer[writePos] = VariantUtil.HDR_LONG_STRING; + writePos += 1; + VariantUtil.writeLong(writeBuffer, writePos, data.length, VariantUtil.U32_SIZE); + writePos += VariantUtil.U32_SIZE; + } else { + writeBuffer[writePos] = VariantUtil.shortStrHeader(data.length); + writePos += 1; + } + System.arraycopy(data, 0, writeBuffer, writePos, data.length); + writePos += data.length; + return this; + } + + /** + * Appends a null value to the Variant builder. + * @return this builder + */ + public VariantBuilder appendNull() { + checkCapacity(1); + writeBuffer[writePos] = VariantUtil.HDR_NULL; + writePos += 1; + return this; + } + + /** + * Appends a boolean value to the Variant builder. + * @param b the boolean value to append + * @return this builder + */ + public VariantBuilder appendBoolean(boolean b) { + checkCapacity(1); + writeBuffer[writePos] = b ? VariantUtil.HDR_TRUE : VariantUtil.HDR_FALSE; + writePos += 1; + return this; + } + + /** + * Appends a long value to the variant builder. + * @param l the long value to append + * @return this builder + */ + public VariantBuilder appendLong(long l) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_INT64; + VariantUtil.writeLong(writeBuffer, writePos + 1, l, 8); + writePos += 9; + return this; + } + + /** + * Appends an int value to the variant builder. + * @param i the int to append + * @return this builder + */ + public VariantBuilder appendInt(int i) { + checkCapacity(1 /* header size */ + 4); + writeBuffer[writePos] = VariantUtil.HDR_INT32; + VariantUtil.writeLong(writeBuffer, writePos + 1, i, 4); + writePos += 5; + return this; + } + + /** + * Appends a short value to the variant builder. + * @param s the short to append + * @return this builder + */ + public VariantBuilder appendShort(short s) { + checkCapacity(1 /* header size */ + 2); + writeBuffer[writePos] = VariantUtil.HDR_INT16; + VariantUtil.writeLong(writeBuffer, writePos + 1, s, 2); + writePos += 3; + return this; + } + + /** + * Appends a byte value to the variant builder. + * @param b the byte to append + * @return this builder + */ + public VariantBuilder appendByte(byte b) { + checkCapacity(1 /* header size */ + 1); + writeBuffer[writePos] = VariantUtil.HDR_INT8; + VariantUtil.writeLong(writeBuffer, writePos + 1, b, 1); + writePos += 2; + return this; + } + + /** + * Appends a double value to the variant builder. + * @param d the double to append + * @return this builder + */ + public VariantBuilder appendDouble(double d) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_DOUBLE; + VariantUtil.writeLong(writeBuffer, writePos + 1, Double.doubleToLongBits(d), 8); + writePos += 9; + return this; + } + + /** + * Appends a decimal value to the variant builder. The actual encoded decimal type depends on the + * precision and scale of the decimal value. + * @param d the decimal value to append + * @return this builder + */ + public VariantBuilder appendDecimal(BigDecimal d) { + BigInteger unscaled = d.unscaledValue(); + if (d.scale() <= VariantUtil.MAX_DECIMAL4_PRECISION && d.precision() <= VariantUtil.MAX_DECIMAL4_PRECISION) { + checkCapacity(2 /* header and scale size */ + 4); + writeBuffer[writePos] = VariantUtil.HDR_DECIMAL4; + writeBuffer[writePos + 1] = (byte) d.scale(); + VariantUtil.writeLong(writeBuffer, writePos + 2, unscaled.intValueExact(), 4); + writePos += 6; + } else if (d.scale() <= VariantUtil.MAX_DECIMAL8_PRECISION + && d.precision() <= VariantUtil.MAX_DECIMAL8_PRECISION) { + checkCapacity(2 /* header and scale size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_DECIMAL8; + writeBuffer[writePos + 1] = (byte) d.scale(); + VariantUtil.writeLong(writeBuffer, writePos + 2, unscaled.longValueExact(), 8); + writePos += 10; + } else { + assert d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION + && d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION; + checkCapacity(2 /* header and scale size */ + 16); + writeBuffer[writePos] = VariantUtil.HDR_DECIMAL16; + writeBuffer[writePos + 1] = (byte) d.scale(); + writePos += 2; + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { + writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { + writeBuffer[writePos + i] = sign; + } + writePos += 16; + } + return this; + } + + /** + * Appends a date value to the variant builder. The date is represented as the number of days + * since the epoch. + * @param daysSinceEpoch the number of days since the epoch + * @return this builder + */ + public VariantBuilder appendDate(int daysSinceEpoch) { + checkCapacity(1 /* header size */ + 4); + writeBuffer[writePos] = VariantUtil.HDR_DATE; + VariantUtil.writeLong(writeBuffer, writePos + 1, daysSinceEpoch, 4); + writePos += 5; + return this; + } + + /** + * Appends a TimestampTz value to the variant builder. The timestamp is represented as the number + * of microseconds since the epoch. + * @param microsSinceEpoch the number of microseconds since the epoch + * @return this builder + */ + public VariantBuilder appendTimestampTz(long microsSinceEpoch) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_TIMESTAMP_TZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, microsSinceEpoch, 8); + writePos += 9; + return this; + } + + /** + * Appends a TimestampNtz value to the variant builder. The timestamp is represented as the number + * of microseconds since the epoch. + * @param microsSinceEpoch the number of microseconds since the epoch + * @return this builder + */ + public VariantBuilder appendTimestampNtz(long microsSinceEpoch) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_TIMESTAMP_NTZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, microsSinceEpoch, 8); + writePos += 9; + return this; + } + + /** + * Appends a Time value to the variant builder. The time is represented as the number of + * microseconds since midnight. + * @param microsSinceMidnight the number of microseconds since midnight + * @return this builder + */ + public VariantBuilder appendTime(long microsSinceMidnight) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_TIME; + VariantUtil.writeLong(writeBuffer, writePos + 1, microsSinceMidnight, 8); + writePos += 9; + return this; + } + + /** + * Appends a TimestampNanosTz value to the variant builder. The timestamp is represented as the + * number of nanoseconds since the epoch. + * @param nanosSinceEpoch the number of nanoseconds since the epoch + * @return this builder + */ + public VariantBuilder appendTimestampNanosTz(long nanosSinceEpoch) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_TIMESTAMP_NANOS_TZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, nanosSinceEpoch, 8); + writePos += 9; + return this; + } + + /** + * Appends a TimestampNanosNtz value to the variant builder. The timestamp is represented as the + * number of nanoseconds since the epoch. + * @param nanosSinceEpoch the number of nanoseconds since the epoch + * @return this builder + */ + public VariantBuilder appendTimestampNanosNtz(long nanosSinceEpoch) { + checkCapacity(1 /* header size */ + 8); + writeBuffer[writePos] = VariantUtil.HDR_TIMESTAMP_NANOS_NTZ; + VariantUtil.writeLong(writeBuffer, writePos + 1, nanosSinceEpoch, 8); + writePos += 9; + return this; + } + + /** + * Appends a float value to the variant builder. + * @param f the float to append + * @return this builder + */ + public VariantBuilder appendFloat(float f) { + checkCapacity(1 /* header size */ + 4); + writeBuffer[writePos] = VariantUtil.HDR_FLOAT; + VariantUtil.writeLong(writeBuffer, writePos + 1, Float.floatToIntBits(f), 8); + writePos += 5; + return this; + } + + /** + * Appends a byte array to the variant builder. + * @param binary the byte array to append + * @return this builder + */ + public VariantBuilder appendBinary(byte[] binary) { + checkCapacity(1 /* header size */ + VariantUtil.U32_SIZE + binary.length); + writeBuffer[writePos] = VariantUtil.HDR_BINARY; + writePos += 1; + VariantUtil.writeLong(writeBuffer, writePos, binary.length, VariantUtil.U32_SIZE); + writePos += VariantUtil.U32_SIZE; + System.arraycopy(binary, 0, writeBuffer, writePos, binary.length); + writePos += binary.length; + return this; + } + + /** + * Appends a UUID value to the variant builder. + * @param uuid the UUID to append + * @return this builder + */ + public VariantBuilder appendUUID(java.util.UUID uuid) { + checkCapacity(1 /* header size */ + VariantUtil.UUID_SIZE); + writeBuffer[writePos] = VariantUtil.HDR_UUID; + writePos += 1; + + ByteBuffer bb = + ByteBuffer.wrap(writeBuffer, writePos, VariantUtil.UUID_SIZE).order(ByteOrder.BIG_ENDIAN); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + writePos += VariantUtil.UUID_SIZE; + return this; + } + + /** + * Starts appending an object to this variant builder. The returned VariantObjectBuilder must + * be used for future calls to addObjectKey() and endObject(). To add a (key, value) to the + * Variant object, call appendObjectKey() to add the key name, and then append the value. + * endObject() must be called to finish writing the Variant object. + * + * Example usage: + * VariantBuilder builder = new VariantBuilder(); + * VariantObjectBuilder objBuilder = builder.startObject(); + * builder.appendObjectKey(objBuilder, "key1"); + * builder.appendString("value1"); + * builder.endObject(objBuilder); + * + * @return a VariantObjectBuilder to use for appendObjectKey() and endObject(). + */ + public VariantObjectBuilder startObject() { + return new VariantObjectBuilder(this); + } + + /** + * Appends a key to the Variant object. This method must be called before appending the + * corresponding value. + * @param objBuilder the VariantObjectBuilder to use + */ + public void appendObjectKey(VariantObjectBuilder objBuilder, String key) { + objBuilder.appendKey(key); + } + + /** + * Ends appending an object to this variant builder. This method must be called after all keys and + * values have been added to the object. + * @param objBuilder the VariantObjectBuilder to use + * @return this builder + */ + public VariantBuilder endObject(VariantObjectBuilder objBuilder) { + ArrayList<FieldEntry> fields = objBuilder.fields(); + int size = fields.size(); + Collections.sort(fields); + int maxId = size == 0 ? 0 : fields.get(0).id; + if (allowDuplicateObjectKeys) { Review Comment: I find `allowDuplicateObjectKeys` confusing now that I see how it is used. This doesn't actually allow keys to have multiple values like a multi-map, it just enables a mode where the builder will replace existing keys with the last value that is set. I think that you could simplify this by always allowing duplicate object keys. There isn't much of a benefit gained by now allowing duplicate keys because duplicates cause a failure although they could be handled. And the code path to deduplicate the keys is nearly identical in the case where there are no duplicates: the first branch comparing the ID is never taken so the block that copies values to the right offset will not run. That means that the extra cost when there are no duplicate keys is the `fields.set` call that doesn't move the value. And the API would simply handle duplicate keys instead of failing with an exception. I think this should remove `allowDuplicateObjectKeys` and always handle duplicates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
