leekeiabstraction commented on code in PR #1992: URL: https://github.com/apache/fluss/pull/1992#discussion_r2559545879
########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.client.converter.RowToPojoConverter; +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Adapter that converts {@link InternalRow} records from a {@link LogScanner} into POJOs of type T. + */ +public class TypedLogScanner<T> implements LogScanner<T> { + + private final LogScanner<InternalRow> delegate; + private final RowToPojoConverter<T> converter; + + public TypedLogScanner( + LogScanner<InternalRow> delegate, + Class<T> pojoClass, + TableInfo tableInfo, + int[] projectedColumns) { + this.delegate = delegate; + RowType tableSchema = tableInfo.getRowType(); + RowType projection = + projectedColumns == null ? tableSchema : tableSchema.project(projectedColumns); + this.converter = RowToPojoConverter.of(pojoClass, tableSchema, projection); + } + + @Override + public ScanRecords<T> poll(Duration timeout) { + ScanRecords<InternalRow> records = delegate.poll(timeout); + if (records == null || records.isEmpty()) { + return ScanRecords.empty(); + } + Map<TableBucket, List<ScanRecord<T>>> out = new HashMap<>(); + for (TableBucket bucket : records.buckets()) { + List<ScanRecord<InternalRow>> list = records.records(bucket); + List<ScanRecord<T>> converted = new ArrayList<>(list.size()); + for (ScanRecord<InternalRow> r : list) { + InternalRow row = r.getValue(); + T pojo = converter.fromRow(row); + converted.add( + new ScanRecord<>(r.logOffset(), r.timestamp(), r.getChangeType(), pojo)); + } + out.put(bucket, converted); Review Comment: I like the clean delegate approach, however what is the performance implication here? We would be looking at 2x memory usage per poll and 2x conversions when compared against non-POJO use cases? Should POJO conversion be pushed down to e.g. `CompletedFetch.toScanRecord(LogRecord)`? Alternatively, we should note this for future improvement. ########## fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java: ########## @@ -73,12 +74,13 @@ static <T> PojoType<T> of(Class<T> pojoClass) { for (Map.Entry<String, Field> e : allFields.entrySet()) { String name = e.getKey(); Field field = e.getValue(); + // Enforce nullable fields: primitives are not allowed in POJO definitions. if (field.getType().isPrimitive()) { throw new IllegalArgumentException( - String.format( - "POJO class %s has primitive field '%s' of type %s. Primitive types are not allowed; all fields must be nullable (use wrapper types).", - pojoClass.getName(), name, field.getType().getName())); + "Primitive types are not allowed; all fields must be nullable (use wrapper types)."); Review Comment: Curious on why we're removing information that would be useful for debugging from logs? It would be nice to at least retain pojoClass name so that users know which pojo the IAE is thrown for. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
