This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 965bf2185 [KYUUBI #2573] [KPIP-4][SUB-TASK] Add a seekable buffered 
reader for random access operation log
965bf2185 is described below

commit 965bf218503451293dafbc62ec2cf195be062a05
Author: ulysses-you <[email protected]>
AuthorDate: Sat May 7 10:03:42 2022 +0800

    [KYUUBI #2573] [KPIP-4][SUB-TASK] Add a seekable buffered reader for random 
access operation log
    
    ### _Why are the changes needed?_
    
    A util class for random access operation log
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2573 from ulysses-you/seekable-reader.
    
    Closes #2573
    
    f75dac72 [ulysses-you] address comment
    9239ea5f [ulysses-you] seek
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../operation/log/SeekableBufferedReader.scala     |  84 +++++++++++++
 .../kyuubi/log/SeekableBufferedReaderSuite.scala   | 136 +++++++++++++++++++++
 2 files changed, 220 insertions(+)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
new file mode 100644
index 000000000..4b1f76328
--- /dev/null
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation.log
+
+import java.io.{BufferedReader, Closeable, IOException}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+
+/**
+ * A seekable buffer reader, if the previous file path lines do not satisfy 
the limit size
+ * the reader will fetch line from next file path.
+ *
+ * Note that, this reader is always read forwards so call side should always 
new instance if
+ * want to replay the lines which has been fetched.
+ */
+class SeekableBufferedReader(paths: Seq[Path]) extends Closeable {
+
+  private val bufferedReaders: Seq[BufferedReader] = paths.map { path =>
+    Files.newBufferedReader(path, StandardCharsets.UTF_8)
+  }
+
+  private var linePos = 0L
+  private var numLines = 0L
+  private var readerIndex = 0
+  private val numReaders = bufferedReaders.length
+  private var currentReader = bufferedReaders.head
+  private var currentValue: String = _
+
+  private def nextLine(): Unit = {
+    currentValue = currentReader.readLine()
+    while (currentValue == null && readerIndex < numReaders - 1) {
+      readerIndex += 1
+      currentReader = bufferedReaders(readerIndex)
+      currentValue = currentReader.readLine()
+    }
+  }
+
+  /**
+   * @param from include
+   * @param limit exclude
+   */
+  def readLine(from: Long, limit: Long): Iterator[String] = {
+    if (from < 0) throw new IOException("Negative seek offset")
+
+    new Iterator[String] {
+      override def hasNext: Boolean = {
+        if (numLines >= limit) {
+          false
+        } else {
+          nextLine()
+          while (linePos < from && currentValue != null) {
+            nextLine()
+            linePos += 1
+          }
+          numLines += 1
+          currentValue != null
+        }
+      }
+
+      override def next(): String = {
+        currentValue
+      }
+    }
+  }
+
+  override def close(): Unit = {
+    bufferedReaders.foreach(_.close())
+  }
+}
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/log/SeekableBufferedReaderSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/log/SeekableBufferedReaderSuite.scala
new file mode 100644
index 000000000..4ee58c57f
--- /dev/null
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/log/SeekableBufferedReaderSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.log
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.operation.log.SeekableBufferedReader
+
+class SeekableBufferedReaderSuite extends KyuubiFunSuite {
+
+  private val tmpDir = Utils.createTempDir()
+  private val f1 = tmpDir.resolve("f1")
+  private val f2 = tmpDir.resolve("f2")
+  private val f3 = tmpDir.resolve("f3")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val f1w = Files.newBufferedWriter(f1, StandardCharsets.UTF_8)
+    0.until(10).foreach { i =>
+      f1w.write(i + "\n")
+    }
+    f1w.flush()
+    f1w.close()
+
+    val f2w = Files.newBufferedWriter(f2, StandardCharsets.UTF_8)
+    0.until(20).foreach { i =>
+      f2w.write(i + "\n")
+    }
+    f2w.flush()
+    f2w.close()
+
+    val f3w = Files.newBufferedWriter(f3, StandardCharsets.UTF_8)
+    0.until(30).foreach { i =>
+      f3w.write(i + "\n")
+    }
+    f3w.flush()
+    f3w.close()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    Utils.deleteDirectoryRecursively(tmpDir.toFile)
+  }
+
+  test("one file") {
+    var reader = new SeekableBufferedReader(f1 :: Nil)
+    var res = reader.readLine(0, 5).toSeq
+    assert(res.size == 5)
+    0.until(5).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i).toInt == v)
+    }
+    reader.close()
+
+    // seek by from
+    reader = new SeekableBufferedReader(f1 :: Nil)
+    res = reader.readLine(2, 3).toSeq
+    assert(res.size == 3)
+    2.until(5).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i).toInt == v)
+    }
+    reader.close()
+
+    // from + size > file lines
+    reader = new SeekableBufferedReader(f1 :: Nil)
+    res = reader.readLine(4, 8).toSeq
+    assert(res.size == 6)
+    4.until(10).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i).toInt == v)
+    }
+    reader.close()
+
+    // from > file lines
+    reader = new SeekableBufferedReader(f1 :: Nil)
+    res = reader.readLine(11, 1).toSeq
+    assert(res.isEmpty)
+    reader.close()
+  }
+
+  test("three files") {
+    var reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+    var res = reader.readLine(0, 11).toSeq
+    assert(res.size == 11)
+    0.until(10).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i).toInt == v)
+    }
+    assert(res(10).toInt == 0)
+    reader.close()
+
+    reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+    res = reader.readLine(11, 21).toSeq
+    assert(res.size == 21)
+    1.until(20).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i).toInt == v)
+    }
+    assert(res(19).toInt == 0)
+    assert(res(20).toInt == 1)
+    reader.close()
+
+    reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+    res = reader.readLine(5, 100).toSeq
+    assert(res.size == 55)
+    5.until(10).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i).toInt == v)
+    }
+    0.until(20).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i + 5).toInt == v)
+    }
+    0.until(30).zipWithIndex.foreach { case (v, i) =>
+      assert(res(i + 25).toInt == v)
+    }
+    reader.close()
+
+    reader = new SeekableBufferedReader(f1 :: f2 :: f3 :: Nil)
+    res = reader.readLine(100, 100).toSeq
+    assert(res.isEmpty)
+    reader.close()
+  }
+}

Reply via email to