lwn3148 commented on code in PR #135:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/135#discussion_r2779631937


##########
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java:
##########
@@ -0,0 +1,298 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain 
string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" 
index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to 
reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of 
TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is 
compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 
'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written 
into
+ * "myusers_2020-03-27" index.
+ */
+@Internal
+final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {}
+
+    public static IndexGenerator createIndexGenerator(
+            String index,
+            List<String> fieldNames,
+            List<DataType> dataTypes,
+            ZoneId localTimeZoneId) {
+        final IndexHelper indexHelper = new IndexHelper();
+        if (indexHelper.checkIsDynamicIndex(index)) {
+            return createRuntimeIndexGenerator(
+                    index,
+                    fieldNames.toArray(new String[0]),
+                    dataTypes.toArray(new DataType[0]),
+                    indexHelper,
+                    localTimeZoneId);
+        } else {
+            return new StaticIndexGenerator(index);
+        }
+    }
+
+    public static IndexGenerator createIndexGenerator(
+            String index, List<String> fieldNames, List<DataType> dataTypes) {
+        return createIndexGenerator(index, fieldNames, dataTypes, 
ZoneId.systemDefault());
+    }
+
+    interface DynamicFormatter extends Serializable {
+        String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+    }
+
+    private static IndexGenerator createRuntimeIndexGenerator(
+            String index,
+            String[] fieldNames,
+            DataType[] fieldTypes,
+            IndexHelper indexHelper,
+            ZoneId localTimeZoneId) {
+        final String dynamicIndexPatternStr = 
indexHelper.extractDynamicIndexPatternStr(index);
+        final String indexPrefix = index.substring(0, 
index.indexOf(dynamicIndexPatternStr));
+        final String indexSuffix =
+                index.substring(indexPrefix.length() + 
dynamicIndexPatternStr.length());
+
+        if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(
+                            index, 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+                @Override
+                public String generate(RowData row) {
+                    return indexPrefix
+                            
.concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))

Review Comment:
   Thanks for the suggestion! Note that `+` is optimized since Java 9 (JEP 280) 
and typically outperforms manual `StringBuilder` for simple concatenation. I’ll 
switch to `+` for better readability and performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to