This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new db5619d5b3 [INLONG-11621][Sort] Support auth key encryption when use
inlong sdk dirty sink (#11653)
db5619d5b3 is described below
commit db5619d5b342d1558a3e20084dd5494790151bb7
Author: vernedeng <[email protected]>
AuthorDate: Mon Jan 6 19:06:01 2025 +0800
[INLONG-11621][Sort] Support auth key encryption when use inlong sdk dirty
sink (#11653)
* [INLONG-11621][Sort] Support auth key encryption when using inlong sdk
dirty sink
---
.../base/dirty/sink/sdk/InlongSdkDirtySink.java | 1 -
.../dirty/sink/sdk/InlongSdkDirtySinkFactory.java | 37 ++++++--
.../inlong/sort/base/dirty/utils/AESUtils.java | 99 ++++++++++++++++++++++
3 files changed, 130 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index daec1c0694..22a5463896 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -88,7 +88,6 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
converter =
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
fieldGetters =
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
- log.info("inlong sdk dirty options={}", options);
// init sender
dirtySender = InlongSdkDirtySender.builder()
.inlongManagerAddr(options.getInlongManagerAddr())
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
index 053aa58364..970775459d 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.base.dirty.sink.sdk;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+import org.apache.inlong.sort.base.dirty.utils.AESUtils;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -28,12 +30,14 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
@@ -86,11 +90,16 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
@Override
public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context
context) {
- ReadableConfig config =
Configuration.fromMap(context.getCatalogTable().getOptions());
- FactoryUtil.validateFactoryOptions(this, config);
- InlongSdkDirtyOptions options = getOptions(config);
- return new InlongSdkDirtySink<>(options,
-
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+ try {
+ ReadableConfig config =
Configuration.fromMap(context.getCatalogTable().getOptions());
+ FactoryUtil.validateFactoryOptions(this, config);
+ InlongSdkDirtyOptions options = getOptions(config);
+ return new InlongSdkDirtySink<>(options,
+
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+ } catch (Throwable t) {
+ log.warn("failed to create dirty sink", t);
+ return null;
+ }
}
private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
@@ -100,7 +109,9 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
.sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
.sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
-
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
+ .inlongManagerAuthKey(
+ decrypt(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY),
+ config.get(DIRTY_SIDE_OUTPUT_LABELS)))
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
.ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
.retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
@@ -109,6 +120,20 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
.build();
}
+ private String decrypt(String encrypted, String key) {
+ String decrypted = null;
+
+ try {
+ byte[] bytes = AESUtils.parseHexStr2Byte(encrypted);
+ bytes = AESUtils.decrypt(bytes,
key.trim().getBytes(StandardCharsets.UTF_8));
+ decrypted = new String(Base64.decodeBase64(bytes),
StandardCharsets.UTF_8);
+ } catch (Throwable t) {
+ log.warn("failed to decrypt {} with key {}", encrypted, key, t);
+ throw new RuntimeException(t);
+ }
+ return decrypted;
+ }
+
@Override
public String factoryIdentifier() {
return IDENTIFIER;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java
new file mode 100644
index 0000000000..e86422ba8f
--- /dev/null
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.utils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import java.security.SecureRandom;
+
+/**
+ * AES encryption and decryption utils.
+ */
+@Slf4j
+public class AESUtils {
+
+ private static final int KEY_SIZE = 128;
+ private static final String ALGORITHM = "AES";
+ private static final String RNG_ALGORITHM = "SHA1PRNG";
+
+ /**
+ * Generate key
+ */
+ private static SecretKey generateKey(byte[] aesKey) throws Exception {
+ SecureRandom random = SecureRandom.getInstance(RNG_ALGORITHM);
+ random.setSeed(aesKey);
+ KeyGenerator gen = KeyGenerator.getInstance(ALGORITHM);
+ gen.init(KEY_SIZE, random);
+ return gen.generateKey();
+ }
+
+ /**
+ * Encrypt by key
+ */
+ public static byte[] encrypt(byte[] plainBytes, byte[] key) throws
Exception {
+ SecretKey secKey = generateKey(key);
+ Cipher cipher = Cipher.getInstance(ALGORITHM);
+ cipher.init(Cipher.ENCRYPT_MODE, secKey);
+ return cipher.doFinal(plainBytes);
+ }
+
+ /**
+ * Decrypt by key and specified version
+ */
+ public static byte[] decrypt(byte[] cipherBytes, byte[] key) throws
Exception {
+ SecretKey secKey = generateKey(key);
+ Cipher cipher = Cipher.getInstance(ALGORITHM);
+ cipher.init(Cipher.DECRYPT_MODE, secKey);
+ return cipher.doFinal(cipherBytes);
+ }
+
+ /**
+ * Parse byte to String in Hex type
+ */
+ public static String parseByte2HexStr(byte[] buf) {
+ StringBuilder strBuf = new StringBuilder();
+ for (byte b : buf) {
+ String hex = Integer.toHexString(b & 0xFF);
+ if (hex.length() == 1) {
+ hex = '0' + hex;
+ }
+ strBuf.append(hex.toUpperCase());
+ }
+ return strBuf.toString();
+ }
+
+ /**
+ * Parse String to byte as Hex type
+ */
+ public static byte[] parseHexStr2Byte(String hexStr) {
+ if (hexStr.length() < 1) {
+ return null;
+ }
+ byte[] result = new byte[hexStr.length() / 2];
+ for (int i = 0; i < hexStr.length() / 2; i++) {
+ int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1),
16);
+ int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2),
16);
+ result[i] = (byte) (high * 16 + low);
+ }
+ return result;
+ }
+}