FANNG1 commented on code in PR #6303:
URL: https://github.com/apache/gravitino/pull/6303#discussion_r1923792459


##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java:
##########
@@ -32,25 +34,81 @@ public interface PropertiesConverter {
 
   /**
    * Converts properties from application provided properties and Flink 
connector properties to
-   * Gravitino properties.
+   * Gravitino properties.This method processes the Flink configuration and 
transforms it into a
+   * format suitable for the Gravitino catalog.
    *
-   * @param flinkConf The configuration provided by Flink.
-   * @return properties for the Gravitino catalog.
+   * @param flinkConf The Flink configuration containing connector properties. 
This includes both
+   *     Flink-specific properties and any user-provided properties.
+   * @return A map of properties converted for use in the Gravitino catalog. 
The returned map
+   *     includes both directly transformed properties and bypass properties 
prefixed with {@link
+   *     #FLINK_PROPERTY_PREFIX}.
    */
   default Map<String, String> toGravitinoCatalogProperties(Configuration 
flinkConf) {
-    return flinkConf.toMap();
+    Map<String, String> gravitinoProperties = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
+      String gravitinoKey = 
transformPropertyToGravitinoCatalog(entry.getKey());
+      if (gravitinoKey != null) {
+        gravitinoProperties.put(gravitinoKey, entry.getValue());
+      } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
+        gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), 
entry.getValue());
+      } else {
+        gravitinoProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return gravitinoProperties;
   }
 
   /**
-   * Converts properties from Gravitino properties to Flink connector 
properties.
+   * Converts properties from Gravitino catalog properties to Flink connector 
properties. This
+   * method processes the Gravitino properties and transforms them into a 
format suitable for the
+   * Flink connector.
    *
-   * @param gravitinoProperties The properties provided by Gravitino.
-   * @return properties for the Flink connector.
+   * @param gravitinoProperties The properties provided by the Gravitino 
catalog. This includes both
+   *     Gravitino-specific properties and any bypass properties prefixed with 
{@link
+   *     #FLINK_PROPERTY_PREFIX}.
+   * @return A map of properties converted for use in the Flink connector. The 
returned map includes
+   *     both transformed properties and the Flink catalog type.
    */
   default Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
-    return gravitinoProperties;
+    Map<String, String> allProperties = Maps.newHashMap();
+    gravitinoProperties.forEach(
+        (key, value) -> {
+          String flinkConfigKey = key;

Review Comment:
   seems this logic is correct, if the key start with flink.bypass, we should 
extract the key, else we get the converted key from 
`transformPropertyToFlinkCatalog`



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java:
##########
@@ -32,25 +34,81 @@ public interface PropertiesConverter {
 
   /**
    * Converts properties from application provided properties and Flink 
connector properties to
-   * Gravitino properties.
+   * Gravitino properties.This method processes the Flink configuration and 
transforms it into a
+   * format suitable for the Gravitino catalog.
    *
-   * @param flinkConf The configuration provided by Flink.
-   * @return properties for the Gravitino catalog.
+   * @param flinkConf The Flink configuration containing connector properties. 
This includes both
+   *     Flink-specific properties and any user-provided properties.
+   * @return A map of properties converted for use in the Gravitino catalog. 
The returned map
+   *     includes both directly transformed properties and bypass properties 
prefixed with {@link
+   *     #FLINK_PROPERTY_PREFIX}.
    */
   default Map<String, String> toGravitinoCatalogProperties(Configuration 
flinkConf) {
-    return flinkConf.toMap();
+    Map<String, String> gravitinoProperties = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
+      String gravitinoKey = 
transformPropertyToGravitinoCatalog(entry.getKey());
+      if (gravitinoKey != null) {
+        gravitinoProperties.put(gravitinoKey, entry.getValue());
+      } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
+        gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), 
entry.getValue());
+      } else {
+        gravitinoProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return gravitinoProperties;
   }
 
   /**
-   * Converts properties from Gravitino properties to Flink connector 
properties.
+   * Converts properties from Gravitino catalog properties to Flink connector 
properties. This
+   * method processes the Gravitino properties and transforms them into a 
format suitable for the
+   * Flink connector.
    *
-   * @param gravitinoProperties The properties provided by Gravitino.
-   * @return properties for the Flink connector.
+   * @param gravitinoProperties The properties provided by the Gravitino 
catalog. This includes both
+   *     Gravitino-specific properties and any bypass properties prefixed with 
{@link
+   *     #FLINK_PROPERTY_PREFIX}.
+   * @return A map of properties converted for use in the Flink connector. The 
returned map includes
+   *     both transformed properties and the Flink catalog type.
    */
   default Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
-    return gravitinoProperties;
+    Map<String, String> allProperties = Maps.newHashMap();
+    gravitinoProperties.forEach(
+        (key, value) -> {
+          String flinkConfigKey = key;

Review Comment:
   seems this logic is not correct, if the key start with flink.bypass, we 
should extract the key, else we get the converted key from 
`transformPropertyToFlinkCatalog`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to