XiaoHongbo-Hope commented on code in PR #7786:
URL: https://github.com/apache/paimon/pull/7786#discussion_r3207805790


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java:
##########
@@ -18,18 +18,157 @@
 
 package org.apache.paimon.flink.function;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.flink.FlinkFileIOLoader;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
 
+import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.ScalarFunction;
 
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
 /** Flink scalar function that constructs a serialized {@link BlobViewStruct}. 
*/
-public class BlobViewFunction extends ScalarFunction {
+public class BlobViewFunction extends ScalarFunction implements 
CatalogAwareFunction {
+
+    @Nullable private String catalogName;
+    @Nullable private Options catalogOptions;
+
+    private transient Catalog catalog;
+    private transient Map<String, Map<String, BlobViewField>> 
blobViewFieldCache;
+
+    @Override
+    public void setCatalogContext(
+            String catalogName, String defaultDatabase, Options 
catalogOptions) {
+        this.catalogName = catalogName;
+        this.catalogOptions = new Options(catalogOptions.toMap());
+        this.catalog = null;
+        this.blobViewFieldCache = null;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        openCatalog(context.getUserCodeClassLoader());
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+    }
 
     public byte[] eval(String identifier, int fieldId, long rowId) {
         if (identifier == null) {
             return null;
         }
         return new BlobViewStruct(Identifier.fromString(identifier), fieldId, 
rowId).serialize();
     }
+
+    public byte[] eval(String tableName, String fieldName, long rowId) {
+        if (tableName == null || fieldName == null) {
+            return null;
+        }
+
+        BlobViewField field = blobViewField(tableName, fieldName);
+        return new BlobViewStruct(field.tableName, field.fieldId, 
rowId).serialize();
+    }
+
+    private BlobViewField blobViewField(String tableName, String fieldName) {
+        Map<String, BlobViewField> tableCache = 
blobViewFieldCache().get(tableName);
+        if (tableCache != null) {
+            BlobViewField cached = tableCache.get(fieldName);
+            if (cached != null) {
+                return cached;
+            }
+        }
+
+        Identifier identifier = toIdentifier(tableName);
+        BlobViewField field = new BlobViewField(identifier, 
fieldId(identifier, fieldName));
+        if (tableCache == null) {
+            tableCache = new HashMap<>();
+            blobViewFieldCache().put(tableName, tableCache);
+        }
+        tableCache.put(fieldName, field);
+        return field;
+    }
+
+    private int fieldId(Identifier identifier, String fieldName) {
+        try {
+            Table table = catalog().getTable(identifier);
+            if (!table.rowType().containsField(fieldName)) {

Review Comment:
   Should we also check that the resolved field is a BLOB field here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to