This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 18c52f3b42 KYLIN-5246 long running job's log staying in mem, may cause job server oom. 18c52f3b42 is described below commit 18c52f3b42fcc8e1f92ae32c6e12f4ddf45fdf64 Author: zhaoliu4 <zhaol...@iflytek.com> AuthorDate: Wed Nov 2 21:54:51 2022 +0800 KYLIN-5246 long running job's log staying in mem, may cause job server oom. keep only the last 10k logs by ring buffer add a UT for the [RingBuffer.java] --- .../kylin/common/util/CliCommandExecutor.java | 19 +++--- .../org/apache/kylin/common/util/RingBuffer.java | 71 ++++++++++++++++++++++ .../apache/kylin/common/util/RingBufferTest.java | 60 ++++++++++++++++++ 3 files changed, 142 insertions(+), 8 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java index 83f30ab5d3..2f56d4f0c0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java @@ -58,10 +58,11 @@ public class CliCommandExecutor { } public void copyFile(String localFile, String destDir) throws IOException { - if (remoteHost == null) + if (remoteHost == null) { copyNative(localFile, destDir); - else + } else { copyRemote(localFile, destDir); + } } private void copyNative(String localFile, String destDir) throws IOException { @@ -93,11 +94,12 @@ public class CliCommandExecutor { r = runRemoteCommand(command, logAppender); } - if (r.getFirst() != 0) - throw new IOException("OS command error exit with return code: " + r.getFirst() // + if (r.getFirst() != 0) { + throw new IOException("OS command error exit with return code: " + r.getFirst() + ", error message: " + r.getSecond() + "The command is: \n" + command - + (remoteHost == null ? "" : " (remoteHost:" + remoteHost + ")") // + + (remoteHost == null ? "" : " (remoteHost:" + remoteHost + ")") ); + } return r; } @@ -143,9 +145,10 @@ public class CliCommandExecutor { BufferedReader reader = new BufferedReader( new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8)); String line; - StringBuilder result = new StringBuilder(); + // keep only the last 10k logs, to avoid cause job server oom. + RingBuffer ringBuffer = RingBuffer.allocate(10240); while ((line = reader.readLine()) != null && !Thread.currentThread().isInterrupted()) { - result.append(line).append('\n'); + ringBuffer.put((line + '\n').getBytes(StandardCharsets.UTF_8)); if (logAppender != null) { logAppender.log(line); } @@ -164,7 +167,7 @@ public class CliCommandExecutor { try { int exitCode = proc.waitFor(); - return Pair.newPair(exitCode, result.toString()); + return Pair.newPair(exitCode, new String(ringBuffer.get(), StandardCharsets.UTF_8)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); diff --git a/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java b/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java new file mode 100644 index 0000000000..8e0113cffb --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +/** + * @author zhaoliu4 + * @date 2022/11/3 + */ +public class RingBuffer { + private final byte[] bytes; + + /** + * next write position + */ + private int writePos; + + /** + * number of bytes has stored + */ + private int size; + + private RingBuffer(int capacity) { + this.bytes = new byte[capacity]; + } + + public static RingBuffer allocate(int capacity) { + return new RingBuffer(capacity); + } + + public RingBuffer put(byte[] src) { + for (int i = 0; i < src.length; i++) { + if (writePos >= bytes.length) { + // reset, turn back continue to write + writePos = 0; + } + bytes[writePos++] = src[i]; + size = size < bytes.length ? size + 1 : size; + } + return this; + } + + public byte[] get() { + byte[] res; + if (size == bytes.length && writePos < size) { + // occur turn back + res = new byte[size]; + System.arraycopy(bytes, writePos, res, 0, size - writePos); + System.arraycopy(bytes, 0, res, size - writePos, writePos); + } else { + res = new byte[writePos]; + System.arraycopy(bytes, 0, res, 0, writePos); + } + return res; + } +} diff --git a/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java b/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java new file mode 100644 index 0000000000..07a9c1f139 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +/** + * @author zhaoliu4 + * @date 2022/11/9 + */ +public class RingBufferTest { + @Test + public void test() throws IOException { + String log1 = "2022-11-09 18:25:48,678 INFO [task-result-getter-0] scheduler.TaskSetManager:54 : Starting task 0.0 in stage 384.0 (TID 621, kylin-hadoop-001, executor 7, partition 0, NODE_LOCAL, 7780 bytes)\n"; + String log2 = "2022-11-09 18:25:48,678 INFO [dispatcher-event-loop-26] spark.MapOutputTrackerMasterEndpoint:54 : Asked to send map output locations for shuffle 138 to x.x.x.x:46334\n"; + String log3 = "2022-11-09 18:25:48,689 INFO [task-result-getter-0] scheduler.TaskSetManager:54 : Finished task 0.0 in stage 384.0 (TID 621) in 31 ms on kylin-hadoop-001 (executor 7) (1/1)\n"; + String log4 = "2022-11-09 18:25:48,689 INFO [task-result-getter-0] cluster.YarnScheduler:54 : Removed TaskSet 384.0, whose tasks have all completed, from pool vip_tasks\n"; + + // mock yarn application log input stream + byte[] logBytes = (log1 + log2 + log3 + log4).getBytes(StandardCharsets.UTF_8); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(logBytes)))) { + String line = null; + // allocate 300B mem + RingBuffer ringBuffer = RingBuffer.allocate(300); + while ((line = reader.readLine()) != null) { + ringBuffer.put((line + '\n').getBytes(StandardCharsets.UTF_8)); + } + + // assert actual log data > 600B, but only keep the last 300B + Assert.assertTrue(logBytes.length > 600 && ringBuffer.get().length == 300); + + Assert.assertEquals(" [task-result-getter-0] scheduler.TaskSetManager:54 : Finished task 0.0 in stage 384.0 (TID 621) in 31 ms on kylin-hadoop-001 (executor 7) (1/1)\n" + + "2022-11-09 18:25:48,689 INFO [task-result-getter-0] cluster.YarnScheduler:54 : Removed TaskSet 384.0, whose tasks have all completed, from pool vip_tasks\n", + new String(ringBuffer.get(), StandardCharsets.UTF_8)); + } + } +}