This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new db5619d5b3 [INLONG-11621][Sort] Support auth key encryption when use 
inlong sdk dirty sink (#11653)
db5619d5b3 is described below

commit db5619d5b342d1558a3e20084dd5494790151bb7
Author: vernedeng <[email protected]>
AuthorDate: Mon Jan 6 19:06:01 2025 +0800

    [INLONG-11621][Sort] Support auth key encryption when use inlong sdk dirty 
sink (#11653)
    
    * [INLONG-11621][Sort] Support auth key encryption when using inlong sdk 
dirty sink
---
 .../base/dirty/sink/sdk/InlongSdkDirtySink.java    |  1 -
 .../dirty/sink/sdk/InlongSdkDirtySinkFactory.java  | 37 ++++++--
 .../inlong/sort/base/dirty/utils/AESUtils.java     | 99 ++++++++++++++++++++++
 3 files changed, 130 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index daec1c0694..22a5463896 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -88,7 +88,6 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
         converter = 
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
         fieldGetters = 
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
 
-        log.info("inlong sdk dirty options={}", options);
         // init sender
         dirtySender = InlongSdkDirtySender.builder()
                 .inlongManagerAddr(options.getInlongManagerAddr())
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
index 053aa58364..970775459d 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.base.dirty.sink.sdk;
 
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+import org.apache.inlong.sort.base.dirty.utils.AESUtils;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -28,12 +30,14 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
 
@@ -86,11 +90,16 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
 
     @Override
     public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context 
context) {
-        ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
-        FactoryUtil.validateFactoryOptions(this, config);
-        InlongSdkDirtyOptions options = getOptions(config);
-        return new InlongSdkDirtySink<>(options,
-                
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+        try {
+            ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+            FactoryUtil.validateFactoryOptions(this, config);
+            InlongSdkDirtyOptions options = getOptions(config);
+            return new InlongSdkDirtySink<>(options,
+                    
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+        } catch (Throwable t) {
+            log.warn("failed to create dirty sink", t);
+            return null;
+        }
     }
 
     private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
@@ -100,7 +109,9 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
                 .sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
                 .sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
                 
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
-                
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
+                .inlongManagerAuthKey(
+                        decrypt(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY),
+                                config.get(DIRTY_SIDE_OUTPUT_LABELS)))
                 
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
                 
.ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
                 .retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
@@ -109,6 +120,20 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
                 .build();
     }
 
+    private String decrypt(String encrypted, String key) {
+        String decrypted = null;
+
+        try {
+            byte[] bytes = AESUtils.parseHexStr2Byte(encrypted);
+            bytes = AESUtils.decrypt(bytes, 
key.trim().getBytes(StandardCharsets.UTF_8));
+            decrypted = new String(Base64.decodeBase64(bytes), 
StandardCharsets.UTF_8);
+        } catch (Throwable t) {
+            log.warn("failed to decrypt {} with key {}", encrypted, key, t);
+            throw new RuntimeException(t);
+        }
+        return decrypted;
+    }
+
     @Override
     public String factoryIdentifier() {
         return IDENTIFIER;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java
new file mode 100644
index 0000000000..e86422ba8f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.utils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import java.security.SecureRandom;
+
+/**
+ * AES encryption and decryption utils.
+ */
+@Slf4j
+public class AESUtils {
+
+    private static final int KEY_SIZE = 128;
+    private static final String ALGORITHM = "AES";
+    private static final String RNG_ALGORITHM = "SHA1PRNG";
+
+    /**
+     * Generate key
+     */
+    private static SecretKey generateKey(byte[] aesKey) throws Exception {
+        SecureRandom random = SecureRandom.getInstance(RNG_ALGORITHM);
+        random.setSeed(aesKey);
+        KeyGenerator gen = KeyGenerator.getInstance(ALGORITHM);
+        gen.init(KEY_SIZE, random);
+        return gen.generateKey();
+    }
+
+    /**
+     * Encrypt by key
+     */
+    public static byte[] encrypt(byte[] plainBytes, byte[] key) throws 
Exception {
+        SecretKey secKey = generateKey(key);
+        Cipher cipher = Cipher.getInstance(ALGORITHM);
+        cipher.init(Cipher.ENCRYPT_MODE, secKey);
+        return cipher.doFinal(plainBytes);
+    }
+
+    /**
+     * Decrypt by key and specified version
+     */
+    public static byte[] decrypt(byte[] cipherBytes, byte[] key) throws 
Exception {
+        SecretKey secKey = generateKey(key);
+        Cipher cipher = Cipher.getInstance(ALGORITHM);
+        cipher.init(Cipher.DECRYPT_MODE, secKey);
+        return cipher.doFinal(cipherBytes);
+    }
+
+    /**
+     * Parse byte to String in Hex type
+     */
+    public static String parseByte2HexStr(byte[] buf) {
+        StringBuilder strBuf = new StringBuilder();
+        for (byte b : buf) {
+            String hex = Integer.toHexString(b & 0xFF);
+            if (hex.length() == 1) {
+                hex = '0' + hex;
+            }
+            strBuf.append(hex.toUpperCase());
+        }
+        return strBuf.toString();
+    }
+
+    /**
+     * Parse String to byte as Hex type
+     */
+    public static byte[] parseHexStr2Byte(String hexStr) {
+        if (hexStr.length() < 1) {
+            return null;
+        }
+        byte[] result = new byte[hexStr.length() / 2];
+        for (int i = 0; i < hexStr.length() / 2; i++) {
+            int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 
16);
+            int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 
16);
+            result[i] = (byte) (high * 16 + low);
+        }
+        return result;
+    }
+}

Reply via email to