This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 30ce46c39b [INLONG-10748][Bug] Fix some null pointer dereference risks
in the code. (#10753)
30ce46c39b is described below
commit 30ce46c39bb581a30c0d2610d467983b385db7ea
Author: LeeWY <[email protected]>
AuthorDate: Tue Aug 6 19:02:55 2024 +0800
[INLONG-10748][Bug] Fix some null pointer dereference risks in the code.
(#10753)
* [INLONG-10748] Fix some null pointer dereference risks in the code.
* [INLONG-10748] Optimize the processing logic of `StringUtils.java`.
---------
Co-authored-by: jameswyli <[email protected]>
---
.../org/apache/inlong/agent/plugin/sources/PulsarSource.java | 2 +-
.../inlong/agent/plugin/sources/file/AbstractSource.java | 12 +++++++-----
.../apache/inlong/audit/service/AuditMsgConsumerServer.java | 4 +++-
.../manager/service/source/StreamSourceServiceImpl.java | 2 +-
.../inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java | 2 +-
.../org/apache/inlong/sdk/commons/protocol/ProxyEvent.java | 8 +++++---
.../apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java | 1 +
.../inlong/sort/base/format/JsonDynamicSchemaFormat.java | 2 +-
.../inlong/sort/pulsar/table/PulsarRowDataConverter.java | 6 ++++--
.../sort/pulsar/table/source/PulsarRowDataConverter.java | 6 ++++--
.../org/apache/inlong/sort/formats/util/StringUtils.java | 4 ++++
11 files changed, 32 insertions(+), 17 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index f7c63acba4..9ca7a5aaf4 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -132,7 +132,7 @@ public class PulsarSource extends AbstractSource {
}
return consumer;
} catch (PulsarClientException | IllegalArgumentException e) {
- if (consumer == null) {
+ if (consumer != null) {
try {
consumer.close();
} catch (PulsarClientException ex) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index a477d84093..1fb96f7ff7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -178,12 +178,14 @@ public abstract class AbstractSource implements Source {
continue;
}
emptyCount = 0;
- for (int i = 0; i < lines.size(); i++) {
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
- if (!suc4Queue) {
- break;
+ if (lines != null) {
+ for (int i = 0; i < lines.size(); i++) {
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+ if (!suc4Queue) {
+ break;
+ }
+ putIntoQueue(lines.get(i));
}
- putIntoQueue(lines.get(i));
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index acd43a7ed6..51ed0caf60 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -97,7 +97,9 @@ public class AuditMsgConsumerServer implements
InitializingBean {
if (storeConfig.isJdbc()) {
jdbcService.start();
}
- mqConsume.start();
+ if (mqConsume != null) {
+ mqConsume.start();
+ }
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index ca6865bdbb..62ccd8f2ca 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -294,7 +294,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
InlongGroupEntity groupEntity =
groupCheckService.checkGroupStatus(groupId, operator);
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", groupEntity.getInlongGroupId()));
+ String.format("InlongGroup does not exist with
InlongGroupId=%s", groupId));
}
StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
// Remove id in sourceField when save
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
index 1e3938dd54..f97e1197a5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
@@ -255,8 +255,8 @@ public class TcpChannelGroup {
ChannelFuture future =
client.connect(tcpChannel.getIpPort().addr).await();
Channel newChannel = future.getChannel();
tcpChannel.setChannel(newChannel);
- newChannel.setAttachment(oldChannel.getAttachment());
if (oldChannel != null) {
+ newChannel.setAttachment(oldChannel.getAttachment());
oldChannel.disconnect();
oldChannel.close();
}
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index dfe27d2a2b..94f374a0a7 100644
---
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -76,10 +76,12 @@ public class ProxyEvent extends SdkEvent {
public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj
obj) {
this.inlongGroupId = inlongGroupId;
this.inlongStreamId = inlongStreamId;
- super.setBody(obj.getBody().toByteArray());
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
- this.msgTime = obj.getMsgTime();
- this.sourceIp = obj.getSourceIp();
+ if (obj != null) {
+ super.setBody(obj.getBody().toByteArray());
+ this.msgTime = obj.getMsgTime();
+ this.sourceIp = obj.getSourceIp();
+ }
Map<String, String> headers = super.getHeaders();
headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
index 1f6caf70d1..02722355f6 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
@@ -95,6 +95,7 @@ public class KafkaSeeker implements Seeker {
if (offsetAndTimestamp == null) {
LOGGER.info("tp {} has null offsetAndTimestamp, reset to end", tp);
endOffsetsTopicPartitions.add(tp);
+ return;
}
long expected = offsetAndTimestamp.offset();
long last = consumer.position(tp);
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 5833719e48..ce1797a426 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -103,7 +103,7 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
return new ArrayList<>();
}
JsonNode physicalNode = getPhysicalData(root);
- if (physicalNode.isArray()) {
+ if (physicalNode != null && physicalNode.isArray()) {
// Extract from the first value when the physicalNode is array
physicalNode = physicalNode.get(FIRST);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
index c66e25c225..82b73dfae0 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
@@ -113,8 +113,10 @@ public class PulsarRowDataConverter implements
Serializable {
new GenericRowData(
rowKind, physicalArity +
readableMetadata.getConnectorMetadataArity());
- for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
- producedRow.setField(valueProjection[valuePos],
physicalValueRow.getField(valuePos));
+ if (physicalValueRow != null) {
+ for (int valuePos = 0; valuePos < valueProjection.length;
valuePos++) {
+ producedRow.setField(valueProjection[valuePos],
physicalValueRow.getField(valuePos));
+ }
}
for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
index 44abbaf7c4..661fad696c 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
@@ -113,8 +113,10 @@ public class PulsarRowDataConverter implements
Serializable {
new GenericRowData(
rowKind, physicalArity +
readableMetadata.getConnectorMetadataArity());
- for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
- producedRow.setField(valueProjection[valuePos],
physicalValueRow.getField(valuePos));
+ if (physicalValueRow != null) {
+ for (int valuePos = 0; valuePos < valueProjection.length;
valuePos++) {
+ producedRow.setField(valueProjection[valuePos],
physicalValueRow.getField(valuePos));
+ }
}
for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
index 3d06625212..f33ad8e825 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
@@ -81,6 +81,10 @@ public class StringUtils {
Map<String, String> fields = new HashMap<>();
List<Map<String, String>> lines = new ArrayList<>();
+ if (text == null) {
+ return lines;
+ }
+
StringBuilder stringBuilder = new StringBuilder();
String key = "";