smallx commented on a change in pull request #3862:
URL: https://github.com/apache/iceberg/pull/3862#discussion_r780741610
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -850,4 +865,47 @@ public String unknown(String sourceName, int sourceId,
String transform,
return String.format("%s(%s) %s %s", transform, sourceName, direction,
nullOrder);
}
}
+
+ public static Object convertPartitionType(Object value, DataType dataType) {
+ if (value == null && dataType instanceof NullType) {
+ return null;
+ }
+ String old = String.valueOf(value);
+ if (dataType instanceof BooleanType) {
+ return DatatypeConverter.parseBoolean(old);
+ }
+ else if (dataType instanceof ByteType) {
+ return DatatypeConverter.parseByte(old);
+ }
+ else if (dataType instanceof ShortType) {
+ return DatatypeConverter.parseShort(old);
+ }
+ else if (dataType instanceof IntegerType) {
+ return DatatypeConverter.parseInt(old);
+ }
+ else if (dataType instanceof DateType) {
+ // days(ts) or date(ts) partition schema DataType
+ return DateTimeUtil.daysFromDate(LocalDate.parse(old));
+ }
+ else if (dataType instanceof FloatType) {
+ return DatatypeConverter.parseFloat(old);
+ }
+ else if (dataType instanceof DoubleType) {
+ return DatatypeConverter.parseDouble(old);
+ }
+ else if (dataType instanceof LongType) {
+ return DatatypeConverter.parseLong(old);
+ }
+ else if (dataType instanceof DecimalType) {
+ return DatatypeConverter.parseDecimal(old);
+ }
+ if (dataType instanceof BinaryType) {
+ return DatatypeConverter.parseHexBinary(old);
+ }
+ else if (dataType instanceof StringType) {
+ return UTF8String.fromString(old);
+ } else {
+ return value;
+ }
Review comment:
这几行代码风格又不一致了. 另外, 对于上面的代码, 虽然这样的风格也可以, 但保持和项目中其他风格统一会更好.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -313,4 +324,75 @@ private static CaseInsensitiveStringMap
addSnapshotId(CaseInsensitiveStringMap o
return options;
}
+
+ @Override
+ public StructType partitionSchema() {
+ Schema schema = icebergTable.spec().schema();
+ List<PartitionField> fields = icebergTable.spec().fields();
+ List<Types.NestedField> structFields =
Lists.newArrayListWithExpectedSize(fields.size());
+ fields.forEach(f -> {
+ Type resultType = Types.StringType.get();
+ Type sourceType = schema.findType(f.sourceId());
+ if (!f.name().endsWith("hour") && !f.name().endsWith("month")) {
+ resultType = f.transform().getResultType(sourceType);
+ }
+ structFields.add(Types.NestedField.optional(f.fieldId(), f.name(),
resultType));
+ });
+ return (StructType)
SparkSchemaUtil.convert(Types.StructType.of(structFields));
+ }
+
+ @Override
+ public void createPartition(InternalRow ident, Map<String, String>
properties) throws UnsupportedOperationException {
+ // use Iceberg SQL extensions
+ }
+
+ @Override
+ public boolean dropPartition(InternalRow ident) {
+ // use Iceberg SQL extensions
+ return false;
+ }
+
+ @Override
+ public void replacePartitionMetadata(InternalRow ident, Map<String, String>
properties)
+ throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("Iceberg partitions do not support
metadata");
+ }
+
+ @Override
+ public Map<String, String> loadPartitionMetadata(InternalRow ident) throws
UnsupportedOperationException {
+ throw new UnsupportedOperationException("Iceberg partitions do not support
metadata");
+ }
+
+ @Override
+ public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow
ident) {
+ // support [show partitions] syntax
+ if (!icebergTable.spec().isUnpartitioned()){
+ if (names.length > 0){
+ return new InternalRow[]{ident};
+ } else {
+ String fileFormat = icebergTable.properties()
+ .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ List<SparkTableUtil.SparkPartition> partitions =
Spark3Util.getPartitions(sparkSession(),
+ new Path(icebergTable.location().concat("\\data")),
fileFormat);
+ List<InternalRow> rows = Lists.newArrayList();
+ StructType schema = partitionSchema();
+ StructField[] fields = schema.fields();
+ partitions.forEach(p -> {
+ int i = 0;
+ Map<String, String> values = p.getValues();
+ List<Object> dataTypeVal = Lists.newArrayList();
+ while (i < fields.length) {
+ DataType dataType = schema.apply(fields[i].name()).dataType();
+
dataTypeVal.add(Spark3Util.convertPartitionType(values.get(fields[i].name()),
dataType));
+ i += 1;
+ }
+ rows.add(new GenericInternalRow(dataTypeVal.toArray()));
+ });
+ return rows.toArray(new InternalRow[0]);
+ }
+ } else{
Review comment:
小虫: `else{` -> `else {`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]