This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 965bf2185 [KYUUBI #2573] [KPIP-4][SUB-TASK] Add a seekable buffered
reader for random access operation log
965bf2185 is described below
commit 965bf218503451293dafbc62ec2cf195be062a05
Author: ulysses-you <[email protected]>
AuthorDate: Sat May 7 10:03:42 2022 +0800
[KYUUBI #2573] [KPIP-4][SUB-TASK] Add a seekable buffered reader for random
access operation log
### _Why are the changes needed?_
A util class for random access operation log
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2573 from ulysses-you/seekable-reader.
Closes #2573
f75dac72 [ulysses-you] address comment
9239ea5f [ulysses-you] seek
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../operation/log/SeekableBufferedReader.scala | 84 +++++++++++++
.../kyuubi/log/SeekableBufferedReaderSuite.scala | 136 +++++++++++++++++++++
2 files changed, 220 insertions(+)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
new file mode 100644
index 000000000..4b1f76328
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation.log
+
+import java.io.{BufferedReader, Closeable, IOException}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+
+/**
+ * A seekable buffer reader, if the previous file path lines do not satisfy
the limit size
+ * the reader will fetch line from next file path.
+ *
+ * Note that, this reader is always read forwards so call side should always
new instance if
+ * want to replay the lines which has been fetched.
+ */
+class SeekableBufferedReader(paths: Seq[Path]) extends Closeable {
+
+ private val bufferedReaders: Seq[BufferedReader] = paths.map { path =>
+ Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ }
+
+ private var linePos = 0L
+ private var numLines = 0L
+ private var readerIndex = 0
+ private val numReaders = bufferedReaders.length
+ private var currentReader = bufferedReaders.head
+ private var currentValue: String = _
+
+ private def nextLine(): Unit = {
+ currentValue = currentReader.readLine()
+ while (currentValue == null && readerIndex < numReaders - 1) {
+ readerIndex += 1
+ currentReader = bufferedReaders(readerIndex)
+ currentValue = currentReader.readLine()
+ }
+ }
+
+ /**
+ * @param from include
+ * @param limit exclude
+ */
+ def readLine(from: Long, limit: Long): Iterator[String] = {
+ if (from < 0) throw new IOException("Negative seek offset")
+
+ new Iterator[String] {
+ override def hasNext: Boolean = {
+ if (numLines >= limit) {
+ false
+ } else {
+ nextLine()
+ while (linePos < from && currentValue != null) {
+ nextLine()
+ linePos += 1
+ }
+ numLines += 1
+ currentValue != null
+ }
+ }
+
+ override def next(): String = {
+ currentValue
+ }
+ }
+ }
+
+ override def close(): Unit = {
+ bufferedReaders.foreach(_.close())
+ }
+}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/log/SeekableBufferedReaderSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/log/SeekableBufferedReaderSuite.scala
new file mode 100644
index 000000000..4ee58c57f
--- /dev/null
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/log/SeekableBufferedReaderSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.log
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.operation.log.SeekableBufferedReader
+
+class SeekableBufferedReaderSuite extends KyuubiFunSuite {
+
+ private val tmpDir = Utils.createTempDir()
+ private val f1 = tmpDir.resolve("f1")
+ private val f2 = tmpDir.resolve("f2")
+ private val f3 = tmpDir.resolve("f3")
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ val f1w = Files.newBufferedWriter(f1, StandardCharsets.UTF_8)
+ 0.until(10).foreach { i =>
+ f1w.write(i + "\n")
+ }
+ f1w.flush()
+ f1w.close()
+
+ val f2w = Files.newBufferedWriter(f2, StandardCharsets.UTF_8)
+ 0.until(20).foreach { i =>
+ f2w.write(i + "\n")
+ }
+ f2w.flush()
+ f2w.close()
+
+ val f3w = Files.newBufferedWriter(f3, StandardCharsets.UTF_8)
+ 0.until(30).foreach { i =>
+ f3w.write(i + "\n")
+ }
+ f3w.flush()
+ f3w.close()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ Utils.deleteDirectoryRecursively(tmpDir.toFile)
+ }
+
+ test("one file") {
+ var reader = new SeekableBufferedReader(f1 :: Nil)
+ var res = reader.readLine(0, 5).toSeq
+ assert(res.size == 5)
+ 0.until(5).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i).toInt == v)
+ }
+ reader.close()
+
+ // seek by from
+ reader = new SeekableBufferedReader(f1 :: Nil)
+ res = reader.readLine(2, 3).toSeq
+ assert(res.size == 3)
+ 2.until(5).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i).toInt == v)
+ }
+ reader.close()
+
+ // from + size > file lines
+ reader = new SeekableBufferedReader(f1 :: Nil)
+ res = reader.readLine(4, 8).toSeq
+ assert(res.size == 6)
+ 4.until(10).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i).toInt == v)
+ }
+ reader.close()
+
+ // from > file lines
+ reader = new SeekableBufferedReader(f1 :: Nil)
+ res = reader.readLine(11, 1).toSeq
+ assert(res.isEmpty)
+ reader.close()
+ }
+
+ test("three files") {
+ var reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+ var res = reader.readLine(0, 11).toSeq
+ assert(res.size == 11)
+ 0.until(10).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i).toInt == v)
+ }
+ assert(res(10).toInt == 0)
+ reader.close()
+
+ reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+ res = reader.readLine(11, 21).toSeq
+ assert(res.size == 21)
+ 1.until(20).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i).toInt == v)
+ }
+ assert(res(19).toInt == 0)
+ assert(res(20).toInt == 1)
+ reader.close()
+
+ reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+ res = reader.readLine(5, 100).toSeq
+ assert(res.size == 55)
+ 5.until(10).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i).toInt == v)
+ }
+ 0.until(20).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i + 5).toInt == v)
+ }
+ 0.until(30).zipWithIndex.foreach { case (v, i) =>
+ assert(res(i + 25).toInt == v)
+ }
+ reader.close()
+
+ reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+ res = reader.readLine(100, 100).toSeq
+ assert(res.isEmpty)
+ reader.close()
+ }
+}