This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new 18c52f3b42 KYLIN-5246 long running job's log staying in mem, may cause 
job server oom.
18c52f3b42 is described below

commit 18c52f3b42fcc8e1f92ae32c6e12f4ddf45fdf64
Author: zhaoliu4 <zhaol...@iflytek.com>
AuthorDate: Wed Nov 2 21:54:51 2022 +0800

    KYLIN-5246 long running job's log staying in mem, may cause job server oom.
    
    keep only the last 10k logs by ring buffer
    
    add a UT for the [RingBuffer.java]
---
 .../kylin/common/util/CliCommandExecutor.java      | 19 +++---
 .../org/apache/kylin/common/util/RingBuffer.java   | 71 ++++++++++++++++++++++
 .../apache/kylin/common/util/RingBufferTest.java   | 60 ++++++++++++++++++
 3 files changed, 142 insertions(+), 8 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 83f30ab5d3..2f56d4f0c0 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -58,10 +58,11 @@ public class CliCommandExecutor {
     }
 
     public void copyFile(String localFile, String destDir) throws IOException {
-        if (remoteHost == null)
+        if (remoteHost == null) {
             copyNative(localFile, destDir);
-        else
+        } else {
             copyRemote(localFile, destDir);
+        }
     }
 
     private void copyNative(String localFile, String destDir) throws 
IOException {
@@ -93,11 +94,12 @@ public class CliCommandExecutor {
             r = runRemoteCommand(command, logAppender);
         }
 
-        if (r.getFirst() != 0)
-            throw new IOException("OS command error exit with return code: " + 
r.getFirst() //
+        if (r.getFirst() != 0) {
+            throw new IOException("OS command error exit with return code: " + 
r.getFirst()
                     + ", error message: " + r.getSecond() + "The command is: 
\n" + command
-                    + (remoteHost == null ? "" : " (remoteHost:" + remoteHost 
+ ")") //
+                    + (remoteHost == null ? "" : " (remoteHost:" + remoteHost 
+ ")")
             );
+        }
 
         return r;
     }
@@ -143,9 +145,10 @@ public class CliCommandExecutor {
             BufferedReader reader = new BufferedReader(
                     new InputStreamReader(proc.getInputStream(), 
StandardCharsets.UTF_8));
             String line;
-            StringBuilder result = new StringBuilder();
+            // keep only the last 10k logs, to avoid cause job server oom.
+            RingBuffer ringBuffer = RingBuffer.allocate(10240);
             while ((line = reader.readLine()) != null && 
!Thread.currentThread().isInterrupted()) {
-                result.append(line).append('\n');
+                ringBuffer.put((line + '\n').getBytes(StandardCharsets.UTF_8));
                 if (logAppender != null) {
                     logAppender.log(line);
                 }
@@ -164,7 +167,7 @@ public class CliCommandExecutor {
 
             try {
                 int exitCode = proc.waitFor();
-                return Pair.newPair(exitCode, result.toString());
+                return Pair.newPair(exitCode, new String(ringBuffer.get(), 
StandardCharsets.UTF_8));
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new IOException(e);
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java 
b/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java
new file mode 100644
index 0000000000..8e0113cffb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+/**
+ * @author zhaoliu4
+ * @date 2022/11/3
+ */
+public class RingBuffer {
+    private final byte[] bytes;
+
+    /**
+     * next write position
+     */
+    private int writePos;
+
+    /**
+     * number of bytes has stored
+     */
+    private int size;
+
+    private RingBuffer(int capacity) {
+        this.bytes = new byte[capacity];
+    }
+
+    public static RingBuffer allocate(int capacity) {
+        return new RingBuffer(capacity);
+    }
+
+    public RingBuffer put(byte[] src) {
+        for (int i = 0; i < src.length; i++) {
+            if (writePos >= bytes.length) {
+                // reset, turn back continue to write
+                writePos = 0;
+            }
+            bytes[writePos++] = src[i];
+            size = size < bytes.length ? size + 1 : size;
+        }
+        return this;
+    }
+
+    public byte[] get() {
+        byte[] res;
+        if (size == bytes.length && writePos < size) {
+            // occur turn back
+            res = new byte[size];
+            System.arraycopy(bytes, writePos, res, 0, size - writePos);
+            System.arraycopy(bytes, 0, res, size - writePos, writePos);
+        } else {
+            res = new byte[writePos];
+            System.arraycopy(bytes, 0, res, 0, writePos);
+        }
+        return res;
+    }
+}
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java 
b/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java
new file mode 100644
index 0000000000..07a9c1f139
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author zhaoliu4
+ * @date 2022/11/9
+ */
+public class RingBufferTest {
+    @Test
+    public void test() throws IOException {
+        String log1 = "2022-11-09 18:25:48,678 INFO  [task-result-getter-0] 
scheduler.TaskSetManager:54 : Starting task 0.0 in stage 384.0 (TID 621, 
kylin-hadoop-001, executor 7, partition 0, NODE_LOCAL, 7780 bytes)\n";
+        String log2 = "2022-11-09 18:25:48,678 INFO  
[dispatcher-event-loop-26] spark.MapOutputTrackerMasterEndpoint:54 : Asked to 
send map output locations for shuffle 138 to x.x.x.x:46334\n";
+        String log3 = "2022-11-09 18:25:48,689 INFO  [task-result-getter-0] 
scheduler.TaskSetManager:54 : Finished task 0.0 in stage 384.0 (TID 621) in 31 
ms on kylin-hadoop-001 (executor 7) (1/1)\n";
+        String log4 = "2022-11-09 18:25:48,689 INFO  [task-result-getter-0] 
cluster.YarnScheduler:54 : Removed TaskSet 384.0, whose tasks have all 
completed, from pool vip_tasks\n";
+
+        // mock yarn application log input stream
+        byte[] logBytes = (log1 + log2 + log3 + 
log4).getBytes(StandardCharsets.UTF_8);
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(new ByteArrayInputStream(logBytes)))) {
+            String line = null;
+            // allocate 300B mem
+            RingBuffer ringBuffer = RingBuffer.allocate(300);
+            while ((line = reader.readLine()) != null) {
+                ringBuffer.put((line + '\n').getBytes(StandardCharsets.UTF_8));
+            }
+
+            // assert actual log data > 600B, but only keep the last 300B
+            Assert.assertTrue(logBytes.length > 600 && ringBuffer.get().length 
== 300);
+
+            Assert.assertEquals(" [task-result-getter-0] 
scheduler.TaskSetManager:54 : Finished task 0.0 in stage 384.0 (TID 621) in 31 
ms on kylin-hadoop-001 (executor 7) (1/1)\n" +
+                            "2022-11-09 18:25:48,689 INFO  
[task-result-getter-0] cluster.YarnScheduler:54 : Removed TaskSet 384.0, whose 
tasks have all completed, from pool vip_tasks\n",
+                    new String(ringBuffer.get(), StandardCharsets.UTF_8));
+        }
+    }
+}

Reply via email to