mapleFU commented on code in PR #1798:
URL: https://github.com/apache/kvrocks/pull/1798#discussion_r1356071687


##########
src/common/rdb_stream.cc:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rdb_stream.h"
+
+#include "fmt/format.h"
+#include "vendor/crc64.h"
+#include "vendor/endianconv.h"
+
+StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+  if (pos_ + n > input_.size()) {
+    return {Status::NotOK, "unexpected EOF"};
+  }
+  memcpy(buf, input_.data() + pos_, n);
+  pos_ += n;
+  return n;
+}
+
+StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
+  if (input_.size() < 8) {
+    return {Status::NotOK, "invalid payload length"};
+  }
+  uint64_t crc = crc64(0, reinterpret_cast<const unsigned char 
*>(input_.data()), input_.size() - 8);
+  memrev64ifbe(&crc);
+  return crc;
+}
+
+Status RdbFileStream::Open() {
+  ifs_.open(file_name_, std::ifstream::in | std::ifstream::binary);
+  if (!ifs_.is_open()) {
+    return {Status::NotOK, fmt::format("failed to open rdb file: '{}': {}", 
file_name_, strerror(errno))};
+  }
+
+  return Status::OK();
+}
+
+StatusOr<size_t> RdbFileStream::Read(char *buf, size_t len) {
+  size_t n = 0;
+  while (len) {
+    size_t read_bytes = max_read_chunk_size_ < len ? max_read_chunk_size_ : 
len;
+    ifs_.read(buf, static_cast<std::streamsize>(read_bytes));
+    if (!ifs_.good()) {
+      return Status(Status::NotOK, fmt::format("read failed: {}:", 
strerror(errno)));
+    }
+    check_sum_ = crc64(check_sum_, (const unsigned char *)buf, read_bytes);

Review Comment:
   using `static_cast` for  `buf`?



##########
src/common/rdb_stream.cc:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rdb_stream.h"
+
+#include "fmt/format.h"
+#include "vendor/crc64.h"
+#include "vendor/endianconv.h"
+
+StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+  if (pos_ + n > input_.size()) {
+    return {Status::NotOK, "unexpected EOF"};
+  }
+  memcpy(buf, input_.data() + pos_, n);
+  pos_ += n;
+  return n;
+}
+
+StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
+  if (input_.size() < 8) {
+    return {Status::NotOK, "invalid payload length"};
+  }
+  uint64_t crc = crc64(0, reinterpret_cast<const unsigned char 
*>(input_.data()), input_.size() - 8);
+  memrev64ifbe(&crc);

Review Comment:
   It's a bit tricky here. Should we `memrev64ifbe` for the crc64 of the Stream?



##########
src/common/rdb_stream.cc:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rdb_stream.h"
+
+#include "fmt/format.h"
+#include "vendor/crc64.h"
+#include "vendor/endianconv.h"
+
+StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+  if (pos_ + n > input_.size()) {
+    return {Status::NotOK, "unexpected EOF"};
+  }
+  memcpy(buf, input_.data() + pos_, n);
+  pos_ += n;
+  return n;
+}
+
+StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
+  if (input_.size() < 8) {
+    return {Status::NotOK, "invalid payload length"};
+  }
+  uint64_t crc = crc64(0, reinterpret_cast<const unsigned char 
*>(input_.data()), input_.size() - 8);
+  memrev64ifbe(&crc);
+  return crc;
+}
+
+Status RdbFileStream::Open() {
+  ifs_.open(file_name_, std::ifstream::in | std::ifstream::binary);
+  if (!ifs_.is_open()) {
+    return {Status::NotOK, fmt::format("failed to open rdb file: '{}': {}", 
file_name_, strerror(errno))};
+  }
+
+  return Status::OK();
+}
+
+StatusOr<size_t> RdbFileStream::Read(char *buf, size_t len) {
+  size_t n = 0;
+  while (len) {
+    size_t read_bytes = max_read_chunk_size_ < len ? max_read_chunk_size_ : 
len;
+    ifs_.read(buf, static_cast<std::streamsize>(read_bytes));

Review Comment:
   https://en.cppreference.com/w/cpp/io/basic_istream/read
   Do we need to handle the exception here?



##########
src/common/rdb_stream.h:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+#include <fstream>
+#include <string>
+
+#include "status.h"
+
+class RdbStream {
+ public:
+  RdbStream() = default;
+  virtual ~RdbStream() = default;
+
+  virtual StatusOr<size_t> Read(char *buf, size_t len) = 0;
+  virtual StatusOr<uint64_t> GetCheckSum() const = 0;
+  StatusOr<uint8_t> ReadByte() {
+    uint8_t value = 0;
+    auto s = Read(reinterpret_cast<char *>(&value), 1);
+    if (!s.IsOK()) {
+      return s;
+    }
+    return value;
+  }
+};
+
+class RdbStringStream : public RdbStream {
+ public:
+  explicit RdbStringStream(std::string_view input) : input_(input){};
+  RdbStringStream(const RdbStringStream &) = delete;
+  RdbStringStream &operator=(const RdbStringStream &) = delete;
+  ~RdbStringStream() override = default;
+
+  StatusOr<size_t> Read(char *buf, size_t len) override;
+  StatusOr<uint64_t> GetCheckSum() const override;
+
+ private:
+  std::string input_;
+  size_t pos_ = 0;
+};
+
+class RdbFileStream : public RdbStream {
+ public:
+  explicit RdbFileStream(std::string file_name, size_t chunk_size = 1024 * 
1024)
+      : file_name_(std::move(file_name)), check_sum_(0), total_read_bytes_(0), 
max_read_chunk_size_(chunk_size){};
+  RdbFileStream(const RdbFileStream &) = delete;
+  RdbFileStream &operator=(const RdbFileStream &) = delete;
+  ~RdbFileStream() override = default;
+
+  Status Open();
+  StatusOr<size_t> Read(char *buf, size_t len) override;
+  StatusOr<uint64_t> GetCheckSum() const override { return check_sum_; }

Review Comment:
   Actually I think `GetCheckSum()` here is a bit tricky. Can we use 
`std::optional<uint64_t> check_sum_` as member, and return `Error` when 
`check_sum_` is unset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to