luoyuxia opened a new issue, #1745:
URL: https://github.com/apache/fluss/issues/1745

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.7.0 (latest release)
   
   ### Please describe the bug 🐞
   
   In the definition, custom properties are not understood by Fluss. When using 
Flink to create fluss table,  the option understood by Fluss will be Fluss 
property. That's right. But the option understood by Fluss will also be set as 
custom properties.
   
   Can be reproduce as following code:
   ```
   void testTableConversionWithOptions() {
           Map<String, String> options = new HashMap<>();
           // forward table option & enum type
           options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
           // forward client memory option
           options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), 
"64mb");
           // forward client duration option
           options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
   
           ResolvedSchema schema =
                   new ResolvedSchema(
                           Collections.singletonList(
                                   Column.physical(
                                           "order_id",
                                           
org.apache.flink.table.api.DataTypes.STRING().notNull())),
                           Collections.emptyList(),
                           null);
           CatalogTable flinkTable =
                   CatalogTable.of(
                           
Schema.newBuilder().fromResolvedSchema(schema).build(),
                           "test comment",
                           Collections.emptyList(),
                           options);
   
           TableDescriptor flussTable =
                   FlinkConversions.toFlussTable(new 
ResolvedCatalogTable(flinkTable, schema));
   
           assertThat(flussTable.getProperties())
                   .containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), 
"indexed");
   
           ------ modify as following 
          HashMap<String, String> customProperties = new HashMap<>();
           
customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), 
"64mb");
           
customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
           
assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
       }
   ```
   
   The test will fail, 
   ```
   Actual and expected should have same size but actual size is:
     3
   while expected size is:
     2
   Actual was:
     {"client.writer.batch-timeout"="32s", 
"client.writer.buffer.memory-size"="64mb", "table.log.format"="indexed"}
   Expected was:
     ["client.writer.buffer.memory-size"="64mb", 
"client.writer.batch-timeout"="32s"]
   ```
   
   "table.log.format"="indexed" is also in custom property.
   
   @wuchong Is an issue needed to be resolved in v0.8
   
   ### Solution
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to