This is an automated email from the ASF dual-hosted git repository.
youling1128 pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/2.8.x by this push:
new 1b4624f7c [#4835] Fixed the fileUpload stream not closed problem
(#4837)
1b4624f7c is described below
commit 1b4624f7c6c372cfa216ffe24b34f8955227df5f
Author: Alex <[email protected]>
AuthorDate: Sat Jun 14 15:00:32 2025 +0800
[#4835] Fixed the fileUpload stream not closed problem (#4837)
---
.../foundation/vertx/http/FileUploadPart.java | 3 +-
.../vertx/http/FileUploadStreamRecorder.java | 213 +++++++++++++++++++++
.../foundation/vertx/http/InputStreamWrapper.java | 80 ++++++++
3 files changed, 295 insertions(+), 1 deletion(-)
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java
index 9e72d64d2..ba7acb2ef 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java
@@ -36,7 +36,8 @@ public class FileUploadPart extends AbstractPart {
@Override
public InputStream getInputStream() throws IOException {
- return Files.newInputStream(new
File(fileUpload.uploadedFileName()).toPath());
+ final InputStream inputStream = Files.newInputStream(new
File(fileUpload.uploadedFileName()).toPath());
+ return new InputStreamWrapper(inputStream);
}
@Override
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java
new file mode 100644
index 000000000..289914f98
--- /dev/null
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+import com.netflix.config.DynamicPropertyFactory;
+
+public class FileUploadStreamRecorder {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileUploadStreamRecorder.class);
+
+ private static final FileUploadStreamRecorder RECORDER = new
FileUploadStreamRecorder();
+
+ public static final String STREAM_RECORDER_MAX_SIZE =
"servicecomb.uploads.file.streamRecorder.maxSize";
+
+ public static final String STREAM_STACKTRACE_ENABLED
+ = "servicecomb.uploads.file.streamRecorder.stackTraceEnabled";
+
+ public static final String STREAM_CHECK_INTERVAL =
"servicecomb.uploads.file.streamRecorder.checkInterval";
+
+ public static final String STREAM_MAX_OPEN_TIME =
"servicecomb.uploads.file.streamRecorder.streamMaxOpenTime";
+
+ public static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000;
+
+ public static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L;
+
+ public static final long DEFAULT_STREAM_MAX_OPEN_TIME = 90000L;
+
+ private final Map<InputStreamWrapper, StreamOperateEvent>
streamWrapperRecorder = new ConcurrentHashMap<>();
+
+ private final EventBus eventBus;
+
+ private final ScheduledExecutorService streamCheckExecutor;
+
+ private final Object lock = new Object();
+
+ private FileUploadStreamRecorder() {
+ eventBus = EventManager.getEventBus();
+ streamCheckExecutor = Executors.newScheduledThreadPool(1,
+ (t) -> new Thread(t, "upload-file-stream-check"));
+ startCheckRecordFileStream();
+ }
+
+ private void startCheckRecordFileStream() {
+ streamCheckExecutor.scheduleWithFixedDelay(this::checkRecordFileStream,
DEFAULT_STREAM_CHECK_INTERVAL,
+ getStreamCheckInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ public static FileUploadStreamRecorder getInstance() {
+ return RECORDER;
+ }
+
+ public void recordOpenStream(final InputStreamWrapper wrapper) {
+ checkAndRemoveOldestStream();
+ streamWrapperRecorder.put(wrapper, new StreamOperateEvent(wrapper));
+ }
+
+ private void checkAndRemoveOldestStream() {
+ int maxSize = getStreamRecorderMaxSize();
+ if (streamWrapperRecorder.size() < maxSize) {
+ return;
+ }
+ synchronized (lock) {
+ StreamOperateEvent oldestEvent =
getOldestOperateEvent(streamWrapperRecorder.values());
+ LOGGER.warn("reached record maxSize [{}] of file stream, delete oldest
stream, operate time [{}], stackTrace: ",
+ maxSize, oldestEvent.getOpenStreamTimestamp(),
oldestEvent.getInvokeStackTrace());
+ oldestEvent.setEventType(EventType.OVER_SIZE);
+ eventBus.post(oldestEvent);
+ closeStreamWrapper(oldestEvent.getInputStreamWrapper());
+ }
+ }
+
+ private StreamOperateEvent
getOldestOperateEvent(Collection<StreamOperateEvent> values) {
+ StreamOperateEvent oldestEvent = null;
+ for (StreamOperateEvent event : values) {
+ if (oldestEvent == null) {
+ oldestEvent = event;
+ continue;
+ }
+ if (oldestEvent.getOpenStreamTimestamp() >
event.getOpenStreamTimestamp()) {
+ oldestEvent = event;
+ }
+ }
+ return oldestEvent;
+ }
+
+ public void clearRecorder(InputStreamWrapper inputStreamWrapper) {
+ streamWrapperRecorder.remove(inputStreamWrapper);
+ }
+
+ private void checkRecordFileStream() {
+ try {
+ if (streamWrapperRecorder.isEmpty()) {
+ return;
+ }
+ List<StreamOperateEvent> overdueStreamEvents = new ArrayList<>();
+ long currentMillis = System.currentTimeMillis();
+ for (StreamOperateEvent event : streamWrapperRecorder.values()) {
+ long streamOperateTime = event.getOpenStreamTimestamp();
+ if (currentMillis - streamOperateTime >= getStreamMaxOpenTime()) {
+ overdueStreamEvents.add(event);
+ }
+ }
+ for (StreamOperateEvent overdueEvent : overdueStreamEvents) {
+ overdueEvent.setEventType(EventType.TIMEOUT);
+ eventBus.post(overdueEvent);
+ closeStreamWrapper(overdueEvent.getInputStreamWrapper());
+ LOGGER.warn("closed timeout stream, operate time [{}], operate
stackTrace: ",
+ overdueEvent.getOpenStreamTimestamp(),
overdueEvent.getInvokeStackTrace());
+ }
+ } catch (Exception e) {
+ LOGGER.error("checkRecordFileStream failed, next interval will try
again.", e);
+ }
+ }
+
+ private void closeStreamWrapper(InputStreamWrapper wrapper) {
+ try {
+ wrapper.close();
+ } catch (IOException e) {
+ LOGGER.error("closed input stream failed!", e);
+ }
+ }
+
+ private int getStreamRecorderMaxSize() {
+ return DynamicPropertyFactory.getInstance()
+ .getIntProperty(STREAM_RECORDER_MAX_SIZE,
DEFAULT_STREAM_RECORDER_MAX_SIZE).get();
+ }
+
+ private static boolean getStreamStackTraceEnabled() {
+ return
DynamicPropertyFactory.getInstance().getBooleanProperty(STREAM_STACKTRACE_ENABLED,
false).get();
+ }
+
+ private long getStreamCheckInterval() {
+ return DynamicPropertyFactory.getInstance()
+ .getLongProperty(STREAM_CHECK_INTERVAL,
DEFAULT_STREAM_CHECK_INTERVAL).get();
+ }
+
+ private long getStreamMaxOpenTime() {
+ return DynamicPropertyFactory.getInstance()
+ .getLongProperty(STREAM_MAX_OPEN_TIME,
DEFAULT_STREAM_MAX_OPEN_TIME).get();
+ }
+
+ public static class StreamOperateEvent {
+ private final InputStreamWrapper inputStreamWrapper;
+
+ private final long openStreamTimestamp;
+
+ private Exception invokeStackTrace;
+
+ private EventType eventType;
+
+ public StreamOperateEvent(InputStreamWrapper inputStreamWrapper) {
+ this.inputStreamWrapper = inputStreamWrapper;
+ if (getStreamStackTraceEnabled()) {
+ this.invokeStackTrace = new Exception();
+ }
+ this.openStreamTimestamp = System.currentTimeMillis();
+ }
+
+ public InputStreamWrapper getInputStreamWrapper() {
+ return inputStreamWrapper;
+ }
+
+ public Exception getInvokeStackTrace() {
+ return invokeStackTrace;
+ }
+
+ public long getOpenStreamTimestamp() {
+ return openStreamTimestamp;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(EventType eventType) {
+ this.eventType = eventType;
+ }
+ }
+
+ public enum EventType {
+ OVER_SIZE,
+ TIMEOUT
+ }
+}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java
new file mode 100644
index 000000000..3bf5889bc
--- /dev/null
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class InputStreamWrapper extends InputStream {
+ private final InputStream inputStream;
+
+ public InputStreamWrapper(InputStream inputStream) {
+ this.inputStream = inputStream;
+ FileUploadStreamRecorder.getInstance().recordOpenStream(this);
+ }
+
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return inputStream.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return inputStream.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inputStream.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return inputStream.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return inputStream.available();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return inputStream.markSupported();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ inputStream.mark(readlimit);
+ }
+
+ @Override
+ public void close() throws IOException {
+ FileUploadStreamRecorder.getInstance().clearRecorder(this);
+ inputStream.close();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ inputStream.reset();
+ }
+}