This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 901c7718 Support for the BITFIELD command (#1901)
901c7718 is described below

commit 901c771872173aafdd95d2dee1f3aef448608e3b
Author: julic20s <[email protected]>
AuthorDate: Sun Dec 3 23:27:28 2023 +0800

    Support for the BITFIELD command (#1901)
    
    Co-authored-by: Twice <[email protected]>
---
 src/cluster/slot_migrate.cc                        |  41 ++-
 src/cluster/slot_migrate.h                         |   2 +-
 src/commands/cmd_bit.cc                            | 153 ++++++++-
 src/commands/command_parser.h                      |  32 ++
 src/common/bitfield_util.cc                        | 124 +++++++
 src/common/bitfield_util.h                         | 316 ++++++++++++++++++
 src/common/lock_manager.h                          |  12 +-
 src/common/parse_util.h                            |  14 +-
 src/storage/batch_extractor.cc                     |   4 +
 src/storage/redis_metadata.h                       |   1 +
 src/types/redis_bitmap.cc                          | 346 ++++++++++++++++++-
 src/types/redis_bitmap.h                           |  24 ++
 src/types/redis_bitmap_string.cc                   |  85 +++++
 src/types/redis_bitmap_string.h                    |   7 +
 tests/cppunit/bitfield_util.cc                     |  48 +++
 tests/cppunit/types/bitmap_test.cc                 | 369 ++++++++++++++++++++-
 .../integration/slotmigrate/slotmigrate_test.go    |  12 +
 tests/gocase/unit/type/bitmap/bitmap_test.go       |  10 +
 18 files changed, 1573 insertions(+), 27 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 3dc1f225..09b220ae 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -474,8 +474,8 @@ Status SlotMigrator::checkSingleResponse(int sock_fd) { 
return checkMultipleResp
 
 // Commands  |  Response            |  Instance
 // ++++++++++++++++++++++++++++++++++++++++
-// set          Redis::Integer         :1/r/n
-// hset         Redis::SimpleString    +OK/r/n
+// set          Redis::Integer         :1\r\n
+// hset         Redis::SimpleString    +OK\r\n
 // sadd         Redis::Integer
 // zadd         Redis::Integer
 // siadd        Redis::Integer
@@ -497,6 +497,7 @@ Status SlotMigrator::checkSingleResponse(int sock_fd) { 
return checkMultipleResp
 // sirem        Redis::Integer
 // del          Redis::Integer
 // xadd         Redis::BulkString
+// bitfield     Redis::Array           *1\r\n:0
 Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) {
   if (sock_fd < 0 || total <= 0) {
     return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, 
count={}", sock_fd, total)};
@@ -509,7 +510,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, 
int total) {
   setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
 
   // Start checking response
-  size_t bulk_len = 0;
+  size_t bulk_or_array_len = 0;
   int cnt = 0;
   parser_state_ = ParserState::ArrayLen;
   UniqueEvbuf evbuf;
@@ -534,14 +535,20 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, 
int total) {
 
           if (line[0] == '-') {
             return {Status::NotOK, fmt::format("got invalid response of length 
{}: {}", line.length, line.get())};
-          } else if (line[0] == '$') {
+          } else if (line[0] == '$' || line[0] == '*') {
             auto parse_result = ParseInt<uint64_t>(std::string(line.get() + 1, 
line.length - 1), 10);
             if (!parse_result) {
               return {Status::NotOK, "protocol error: expected integer value"};
             }
 
-            bulk_len = *parse_result;
-            parser_state_ = bulk_len > 0 ? ParserState::BulkData : 
ParserState::OneRspEnd;
+            bulk_or_array_len = *parse_result;
+            if (bulk_or_array_len <= 0) {
+              parser_state_ = ParserState::OneRspEnd;
+            } else if (line[0] == '$') {
+              parser_state_ = ParserState::BulkData;
+            } else {
+              parser_state_ = ParserState::ArrayData;
+            }
           } else if (line[0] == '+' || line[0] == ':') {
             parser_state_ = ParserState::OneRspEnd;
           } else {
@@ -552,17 +559,33 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, 
int total) {
         }
         // Handle bulk string response
         case ParserState::BulkData: {
-          if (evbuffer_get_length(evbuf.get()) < bulk_len + 2) {
+          if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
             LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, 
read socket again";
             run = false;
             break;
           }
           // TODO(chrisZMF): Check tail '\r\n'
-          evbuffer_drain(evbuf.get(), bulk_len + 2);
-          bulk_len = 0;
+          evbuffer_drain(evbuf.get(), bulk_or_array_len + 2);
+          bulk_or_array_len = 0;
           parser_state_ = ParserState::OneRspEnd;
           break;
         }
+        case ParserState::ArrayData: {
+          while (run && bulk_or_array_len > 0) {
+            evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, 
nullptr, EVBUFFER_EOL_CRLF_STRICT);
+            if (ptr.pos < 0) {
+              LOG(INFO) << "[migrate] Array data in event buffer is not 
complete, read socket again";
+              run = false;
+              break;
+            }
+            evbuffer_drain(evbuf.get(), ptr.pos + 2);
+            --bulk_or_array_len;
+          }
+          if (run) {
+            parser_state_ = ParserState::OneRspEnd;
+          }
+          break;
+        }
         case ParserState::OneRspEnd: {
           cnt++;
           if (cnt >= total) {
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 4ac00ad5..9aad1dcc 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -138,7 +138,7 @@ class SlotMigrator : public redis::Database {
 
   void resumeSyncCtx(const Status &migrate_result);
 
-  enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd };
+  enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd };
   enum class ThreadState { Uninitialized, Running, Terminated };
 
   static const int kDefaultMaxPipelineSize = 16;
diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc
index f0a25d39..c1d72462 100644
--- a/src/commands/cmd_bit.cc
+++ b/src/commands/cmd_bit.cc
@@ -19,6 +19,7 @@
  */
 
 #include "commander.h"
+#include "commands/command_parser.h"
 #include "error_constants.h"
 #include "server/server.h"
 #include "types/redis_bitmap.h"
@@ -132,6 +133,8 @@ class CommandBitCount : public Commander {
 
 class CommandBitPos : public Commander {
  public:
+  using Commander::Parse;
+
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() >= 4) {
       auto parse_start = ParseInt<int64_t>(args[3], 10);
@@ -225,10 +228,158 @@ class CommandBitOp : public Commander {
   BitOpFlags op_flag_;
 };
 
+class CommandBitfield : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    BitfieldOperation cmd;
+
+    read_only_ = true;
+    // BITFIELD <key> [commands...]
+    for (CommandParser group(args, 2); group.Good();) {
+      auto remains = group.Remains();
+
+      std::string opcode = util::ToLower(group[0]);
+      if (opcode == "get") {
+        cmd.type = BitfieldOperation::Type::kGet;
+      } else if (opcode == "set") {
+        cmd.type = BitfieldOperation::Type::kSet;
+        read_only_ = false;
+      } else if (opcode == "incrby") {
+        cmd.type = BitfieldOperation::Type::kIncrBy;
+        read_only_ = false;
+      } else if (opcode == "overflow") {
+        constexpr auto kOverflowCmdSize = 2;
+        if (remains < kOverflowCmdSize) {
+          return {Status::RedisParseErr, errWrongNumOfArguments};
+        }
+        auto s = parseOverflowSubCommand(group[1], &cmd);
+        if (!s.IsOK()) {
+          return s;
+        }
+
+        group.Skip(kOverflowCmdSize);
+        continue;
+      } else {
+        return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
+      }
+
+      if (remains < 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      // parse encoding
+      auto encoding = parseBitfieldEncoding(group[1]);
+      if (!encoding.IsOK()) {
+        return encoding.ToStatus();
+      }
+      cmd.encoding = encoding.GetValue();
+
+      // parse offset
+      if (!GetBitOffsetFromArgument(group[2], &cmd.offset).IsOK()) {
+        return {Status::RedisParseErr, "bit offset is not an integer or out of 
range"};
+      }
+
+      if (cmd.type != BitfieldOperation::Type::kGet) {
+        if (remains < 4) {
+          return {Status::RedisParseErr, errWrongNumOfArguments};
+        }
+
+        auto value = ParseInt<int64_t>(group[3], 10);
+        if (!value.IsOK()) {
+          return value.ToStatus();
+        }
+        cmd.value = value.GetValue();
+
+        // SET|INCRBY <encoding> <offset> <value>
+        group.Skip(4);
+      } else {
+        // GET <encoding> <offset>
+        group.Skip(3);
+      }
+
+      cmds_.push_back(cmd);
+    }
+
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
+    std::vector<std::optional<BitfieldValue>> rets;
+    rocksdb::Status s;
+    if (read_only_) {
+      s = bitmap_db.BitfieldReadOnly(args_[1], cmds_, &rets);
+    } else {
+      s = bitmap_db.Bitfield(args_[1], cmds_, &rets);
+    }
+    std::vector<std::string> str_rets(rets.size());
+    for (size_t i = 0; i != rets.size(); ++i) {
+      if (rets[i].has_value()) {
+        if (rets[i]->Encoding().IsSigned()) {
+          str_rets[i] = 
redis::Integer(CastToSignedWithoutBitChanges(rets[i]->Value()));
+        } else {
+          str_rets[i] = redis::Integer(rets[i]->Value());
+        }
+      } else {
+        str_rets[i] = redis::NilString();
+      }
+    }
+    *output = redis::Array(str_rets);
+    return Status::OK();
+  }
+
+ private:
+  static Status parseOverflowSubCommand(const std::string &overflow, 
BitfieldOperation *cmd) {
+    std::string lower = util::ToLower(overflow);
+    if (lower == "wrap") {
+      cmd->overflow = BitfieldOverflowBehavior::kWrap;
+    } else if (lower == "sat") {
+      cmd->overflow = BitfieldOverflowBehavior::kSat;
+    } else if (lower == "fail") {
+      cmd->overflow = BitfieldOverflowBehavior::kFail;
+    } else {
+      return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
+    }
+    return Status::OK();
+  }
+
+  static StatusOr<BitfieldEncoding> parseBitfieldEncoding(const std::string 
&token) {
+    if (token.empty()) {
+      return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
+    }
+
+    auto sign = std::tolower(token[0]);
+    if (sign != 'u' && sign != 'i') {
+      return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
+    }
+
+    auto type = BitfieldEncoding::Type::kUnsigned;
+    if (sign == 'i') {
+      type = BitfieldEncoding::Type::kSigned;
+    }
+
+    auto bits_parse = ParseInt<uint8_t>(token.substr(1), 10);
+    if (!bits_parse.IsOK()) {
+      return bits_parse.ToStatus();
+    }
+    uint8_t bits = bits_parse.GetValue();
+
+    auto encoding = BitfieldEncoding::Create(type, bits);
+    if (!encoding.IsOK()) {
+      return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
+    }
+    return encoding.GetValue();
+  }
+
+  std::vector<BitfieldOperation> cmds_;
+  bool read_only_;
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandGetBit>("getbit", 3, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandSetBit>("setbit", 4, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandBitCount>("bitcount", -2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandBitPos>("bitpos", -3, "read-only", 
1, 1, 1),
-                        MakeCmdAttr<CommandBitOp>("bitop", -4, "write", 2, -1, 
1), )
+                        MakeCmdAttr<CommandBitOp>("bitop", -4, "write", 2, -1, 
1),
+                        MakeCmdAttr<CommandBitfield>("bitfield", -2, "write", 
1, 1, 1), )
 
 }  // namespace redis
diff --git a/src/commands/command_parser.h b/src/commands/command_parser.h
index 483778e2..3aa31ae0 100644
--- a/src/commands/command_parser.h
+++ b/src/commands/command_parser.h
@@ -41,6 +41,9 @@ struct CommandParser {
  public:
   using ValueType = typename Iter::value_type;
 
+  static constexpr bool IsRandomAccessIter =
+      std::is_base_of_v<std::random_access_iterator_tag, typename 
std::iterator_traits<Iter>::iterator_category>;
+
   CommandParser(Iter begin, Iter end) : begin_(std::move(begin)), 
end_(std::move(end)) {}
 
   template <typename Container>
@@ -56,12 +59,41 @@ struct CommandParser {
 
   decltype(auto) RawPeek() const { return *begin_; }
 
+  decltype(auto) operator[](size_t index) const {
+    Iter iter = begin_;
+    std::advance(iter, index);
+    return *iter;
+  }
+
   decltype(auto) RawTake() { return *begin_++; }
 
   decltype(auto) RawNext() { ++begin_; }
 
   bool Good() const { return begin_ != end_; }
 
+  std::enable_if_t<IsRandomAccessIter, size_t> Remains() const {
+    // O(1) iff Iter is random access iterator.
+    auto d = std::distance(begin_, end_);
+    DCHECK(d >= 0);
+    return d;
+  }
+
+  size_t Skip(size_t count) {
+    if constexpr (IsRandomAccessIter) {
+      size_t steps = std::min(Remains(), count);
+      begin_ += steps;
+      return steps;
+    } else {
+      size_t steps = 0;
+      while (count != 0 && Good()) {
+        ++begin_;
+        ++steps;
+        --count;
+      }
+      return steps;
+    }
+  }
+
   template <typename Pred>
   bool EatPred(Pred&& pred) {
     if (Good() && std::forward<Pred>(pred)(RawPeek())) {
diff --git a/src/common/bitfield_util.cc b/src/common/bitfield_util.cc
new file mode 100644
index 00000000..bd8a549f
--- /dev/null
+++ b/src/common/bitfield_util.cc
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "bitfield_util.h"
+
+namespace detail {
+
+static uint64_t WrappedSignedBitfieldPlus(uint64_t value, int64_t incr, 
uint8_t bits) {
+  uint64_t res = value + static_cast<uint64_t>(incr);
+  if (bits < 64) {
+    auto mask = std::numeric_limits<uint64_t>::max() << bits;
+    if ((res & (1 << (bits - 1))) != 0) {
+      res |= mask;
+    } else {
+      res &= ~mask;
+    }
+  }
+  return res;
+}
+
+// See also 
https://github.com/redis/redis/blob/7f4bae817614988c43c3024402d16edcbf3b3277/src/bitops.c#L325
+StatusOr<bool> SignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, 
BitfieldOverflowBehavior overflow,
+                                  uint64_t *dst) {
+  Status 
bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kSigned,
 bits));
+  if (!bits_status) {
+    return bits_status;
+  }
+
+  auto max = std::numeric_limits<int64_t>::max();
+  if (bits != 64) {
+    max = (static_cast<int64_t>(1) << (bits - 1)) - 1;
+  }
+  int64_t min = -max - 1;
+
+  int64_t signed_value = CastToSignedWithoutBitChanges(value);
+  int64_t max_incr = CastToSignedWithoutBitChanges(static_cast<uint64_t>(max) 
- value);
+  int64_t min_incr = min - signed_value;
+
+  if (signed_value > max || (bits != 64 && incr > max_incr) || (signed_value 
>= 0 && incr >= 0 && incr > max_incr)) {
+    if (overflow == BitfieldOverflowBehavior::kWrap) {
+      *dst = WrappedSignedBitfieldPlus(value, incr, bits);
+    } else if (overflow == BitfieldOverflowBehavior::kSat) {
+      *dst = max;
+    } else {
+      DCHECK(overflow == BitfieldOverflowBehavior::kFail);
+    }
+    return true;
+  } else if (signed_value < min || (bits != 64 && incr < min_incr) ||
+             (signed_value < 0 && incr < 0 && incr < min_incr)) {
+    if (overflow == BitfieldOverflowBehavior::kWrap) {
+      *dst = WrappedSignedBitfieldPlus(value, incr, bits);
+    } else if (overflow == BitfieldOverflowBehavior::kSat) {
+      *dst = min;
+    } else {
+      DCHECK(overflow == BitfieldOverflowBehavior::kFail);
+    }
+    return true;
+  }
+
+  *dst = signed_value + incr;
+  return false;
+}
+
+static uint64_t WrappedUnsignedBitfieldPlus(uint64_t value, int64_t incr, 
uint8_t bits) {
+  uint64_t mask = std::numeric_limits<uint64_t>::max() << bits;
+  uint64_t res = value + incr;
+  res &= ~mask;
+  return res;
+}
+
+// See also 
https://github.com/redis/redis/blob/7f4bae817614988c43c3024402d16edcbf3b3277/src/bitops.c#L288
+StatusOr<bool> UnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t 
bits, BitfieldOverflowBehavior overflow,
+                                    uint64_t *dst) {
+  Status 
bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kUnsigned,
 bits));
+  if (!bits_status) {
+    return bits_status;
+  }
+
+  auto max = (static_cast<uint64_t>(1) << bits) - 1;
+  int64_t max_incr = CastToSignedWithoutBitChanges(max - value);
+  int64_t min_incr = CastToSignedWithoutBitChanges((~value) + 1);
+
+  if (value > max || (incr > 0 && incr > max_incr)) {
+    if (overflow == BitfieldOverflowBehavior::kWrap) {
+      *dst = WrappedUnsignedBitfieldPlus(value, incr, bits);
+    } else if (overflow == BitfieldOverflowBehavior::kSat) {
+      *dst = max;
+    } else {
+      DCHECK(overflow == BitfieldOverflowBehavior::kFail);
+    }
+    return true;
+  } else if (incr < 0 && incr < min_incr) {
+    if (overflow == BitfieldOverflowBehavior::kWrap) {
+      *dst = WrappedUnsignedBitfieldPlus(value, incr, bits);
+    } else if (overflow == BitfieldOverflowBehavior::kSat) {
+      *dst = 0;
+    } else {
+      DCHECK(overflow == BitfieldOverflowBehavior::kFail);
+    }
+    return true;
+  }
+
+  *dst = value + incr;
+  return false;
+}
+
+}  // namespace detail
diff --git a/src/common/bitfield_util.h b/src/common/bitfield_util.h
new file mode 100644
index 00000000..dbb44b1d
--- /dev/null
+++ b/src/common/bitfield_util.h
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+#include <limits>
+#include <stdexcept>
+#include <string>
+
+#include "status.h"
+
+enum class BitfieldOverflowBehavior : uint8_t { kWrap, kSat, kFail };
+
+class BitfieldEncoding {
+ public:
+  enum class Type : uint8_t { kSigned, kUnsigned };
+
+  // check whether the bit length is fit to given number sign.
+  // Redis has bits length limitation to bitfield.
+  // Quote:
+  // The supported encodings are up to 64 bits for signed integers, and up to 
63 bits for unsigned integers. This
+  // limitation with unsigned integers is due to the fact that currently the 
Redis protocol is unable to return 64 bit
+  // unsigned integers as replies.
+  // see also https://redis.io/commands/bitfield/
+  static Status CheckSupportedBitLengths(Type type, uint8_t bits) noexcept {
+    uint8_t max_bits = 64;
+    if (type == Type::kUnsigned) {
+      max_bits = 63;
+    }
+    if (1 <= bits && bits <= max_bits) {
+      return Status::OK();
+    }
+
+    return {Status::NotOK, "Unsupported new bits length, only i1~i64, u1~u63 
are supported."};
+  }
+
+  static StatusOr<BitfieldEncoding> Create(Type type, uint8_t bits) noexcept {
+    Status bits_status(CheckSupportedBitLengths(type, bits));
+    if (!bits_status) {
+      return bits_status;
+    }
+
+    BitfieldEncoding enc;
+    enc.type_ = type;
+    enc.bits_ = bits;
+    return enc;
+  }
+
+  Type GetType() const noexcept { return type_; }
+
+  bool IsSigned() const noexcept { return type_ == Type::kSigned; }
+
+  bool IsUnsigned() const noexcept { return type_ == Type::kUnsigned; }
+
+  uint8_t Bits() const noexcept { return bits_; }
+
+  Status SetBitsCount(uint8_t new_bits) noexcept {
+    Status bits_status(CheckSupportedBitLengths(type_, new_bits));
+    if (!bits_status) {
+      return bits_status;
+    }
+
+    bits_ = new_bits;
+    return Status::OK();
+  }
+
+  Status SetType(Type new_type) noexcept {
+    Status bits_status(CheckSupportedBitLengths(new_type, bits_));
+    if (!bits_status) {
+      return bits_status;
+    }
+
+    type_ = new_type;
+    return Status::OK();
+  }
+
+  std::string ToString() const noexcept { return (IsSigned() ? "i" : "u") + 
std::to_string(static_cast<int>(bits_)); }
+
+ private:
+  BitfieldEncoding() = default;
+
+  Type type_;
+  uint8_t bits_;
+};
+
+struct BitfieldOperation {
+  // see https://redis.io/commands/bitfield/ to get more details.
+  enum class Type : uint8_t { kGet, kSet, kIncrBy };
+
+  Type type;
+  BitfieldOverflowBehavior overflow{BitfieldOverflowBehavior::kWrap};
+  BitfieldEncoding 
encoding{BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 
32).GetValue()};
+  uint32_t offset;
+  // INCRBY amount or SET value
+  int64_t value;
+};
+
+namespace detail {
+// Let value add incr, according to the bits limit and overflow rule. The 
value is reguarded as a signed integer.
+// Return true if overflow. Status is not ok iff calling 
BitfieldEncoding::IsSupportedBitLengths()
+// for given op return false.
+StatusOr<bool> SignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, 
BitfieldOverflowBehavior overflow,
+                                  uint64_t *dst);
+
+// Let value add incr, according to the bits limit and overflow rule.  The 
value is reguarded as an unsigned integer.
+// Return true if overflow. Status is not ok iff calling 
BitfieldEncoding::IsSupportedBitLengths()
+// for given op return false.
+StatusOr<bool> UnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t 
bits, BitfieldOverflowBehavior overflow,
+                                    uint64_t *dst);
+}  // namespace detail
+
+// safe cast from unsigned to signed, without any bit changes.
+// see also "Integral conversions" on 
https://en.cppreference.com/w/cpp/language/implicit_conversion
+// If the destination type is signed, the result when overflow is 
implementation-defined until C++20
+inline int64_t CastToSignedWithoutBitChanges(uint64_t x) {
+  int64_t res = 0;
+  memcpy(&res, &x, sizeof(res));
+  return res;
+}
+
+class BitfieldValue {
+ public:
+  BitfieldValue(BitfieldEncoding encoding, uint64_t value) noexcept : 
encoding_(encoding), value_(value) {}
+
+  template <class T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
+  bool operator==(T rhs) const {
+    return value_ == static_cast<uint64_t>(rhs);
+  }
+
+  template <class T>
+  friend bool operator==(T lhs, const BitfieldValue &rhs) {
+    return rhs == lhs;
+  }
+
+  BitfieldEncoding Encoding() const noexcept { return encoding_; }
+
+  uint64_t Value() const noexcept { return value_; }
+
+ private:
+  BitfieldEncoding encoding_;
+  uint64_t value_;
+};
+
+// Let value add incr, according to the encoding and overflow rule.
+// return true if overflow. Status is not ok iff calling 
BitfieldEncoding::IsSupportedBitLengths()
+// for given op return false.
+inline StatusOr<bool> BitfieldPlus(uint64_t value, int64_t incr, 
BitfieldEncoding enc,
+                                   BitfieldOverflowBehavior overflow, uint64_t 
*dst) {
+  if (enc.IsSigned()) {
+    return detail::SignedBitfieldPlus(value, incr, enc.Bits(), overflow, dst);
+  }
+  return detail::UnsignedBitfieldPlus(value, incr, enc.Bits(), overflow, dst);
+}
+
+// return true if successful. Status is not ok iff calling 
BitfieldEncoding::IsSupportedBitLengths()
+// for given op return false.
+inline StatusOr<bool> BitfieldOp(BitfieldOperation op, uint64_t old_value, 
uint64_t *new_value) {
+  if (op.type == BitfieldOperation::Type::kGet) {
+    *new_value = old_value;
+    return true;
+  }
+
+  bool overflow = false;
+  if (op.type == BitfieldOperation::Type::kSet) {
+    overflow = GET_OR_RET(BitfieldPlus(op.value, 0, op.encoding, op.overflow, 
new_value));
+  } else {
+    overflow = GET_OR_RET(BitfieldPlus(old_value, op.value, op.encoding, 
op.overflow, new_value));
+  }
+
+  return op.overflow != BitfieldOverflowBehavior::kFail || !overflow;
+}
+
+// Use a small buffer to store a range of bytes on a bitmap.
+// If you try to visit other place on bitmap, it will failed.
+class ArrayBitfieldBitmap {
+ public:
+  // byte_offset is the byte offset of view in entire bitmap.
+  explicit ArrayBitfieldBitmap(uint32_t byte_offset = 0) noexcept : 
byte_offset_(byte_offset) { Reset(); }
+
+  ArrayBitfieldBitmap(const ArrayBitfieldBitmap &) = delete;
+  ArrayBitfieldBitmap &operator=(const ArrayBitfieldBitmap &) = delete;
+  ~ArrayBitfieldBitmap() = default;
+
+  // Change the position this represents.
+  void SetByteOffset(uint64_t byte_offset) noexcept { byte_offset_ = 
byte_offset; }
+
+  void Reset() { memset(buf_, 0, sizeof(buf_)); }
+
+  Status Set(uint32_t byte_offset, uint32_t bytes, const uint8_t *src) {
+    Status bound_status(checkLegalBound(byte_offset, bytes));
+    if (!bound_status) {
+      return bound_status;
+    }
+    byte_offset -= byte_offset_;
+    memcpy(buf_ + byte_offset, src, bytes);
+    return Status::OK();
+  }
+
+  Status Get(uint32_t byte_offset, uint32_t bytes, uint8_t *dst) const {
+    Status bound_status(checkLegalBound(byte_offset, bytes));
+    if (!bound_status) {
+      return bound_status;
+    }
+    byte_offset -= byte_offset_;
+    memcpy(dst, buf_ + byte_offset, bytes);
+    return Status::OK();
+  }
+
+  StatusOr<uint64_t> GetUnsignedBitfield(uint64_t bit_offset, uint64_t bits) 
const {
+    Status 
bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kUnsigned,
 bits));
+    if (!bits_status) {
+      return bits_status;
+    }
+    return getBitfield(bit_offset, bits);
+  }
+
+  StatusOr<int64_t> GetSignedBitfield(uint64_t bit_offset, uint64_t bits) 
const {
+    Status 
bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kSigned,
 bits));
+    if (!bits_status) {
+      return bits_status;
+    }
+    uint64_t bitfield = GET_OR_RET(getBitfield(bit_offset, bits));
+    int64_t value = CastToSignedWithoutBitChanges(bitfield);
+
+    // for a bits of k signed number, 1 << (bits - 1) is the MSB 
(most-significant bit).
+    // the number is a negative when the MSB is "1".
+    auto msb = static_cast<uint64_t>(1) << (bits - 1);  // NOLINT
+    if ((value & msb) != 0) {
+      // The way of enlarge width of a signed integer is sign-extended.
+      // The values of higher bits should all "1", when the number is negative.
+      // For example:
+      // constexpr int32_t a = -128;
+      // static_assert(a == 0xffffff80);
+      // constexpr int64_t b = a; // b is -128 too.
+      // static_assert(b == 0xffffffffffffff80);
+      value |= 
CastToSignedWithoutBitChanges(std::numeric_limits<uint64_t>::max() << bits);
+    }
+    return value;
+  }
+
+  Status SetBitfield(uint32_t bit_offset, uint32_t bits, uint64_t value) {
+    uint32_t first_byte = bit_offset / 8;
+    uint32_t last_byte = (bit_offset + bits - 1) / 8 + 1;
+    Status bound_status(checkLegalBound(first_byte, last_byte - first_byte));
+    if (!bound_status) {
+      return bound_status;
+    }
+
+    bit_offset -= byte_offset_ * 8;
+    while (bits--) {
+      bool v = (value & (uint64_t(1) << bits)) != 0;
+      uint32_t byte = bit_offset >> 3;
+      uint32_t bit = 7 - (bit_offset & 7);
+      uint32_t byteval = buf_[byte];
+      byteval &= ~(1 << bit);
+      byteval |= int(v) << bit;
+      buf_[byte] = char(byteval);
+      bit_offset++;
+    }
+    return Status::OK();
+  }
+
+ private:
+  // The bit field cannot exceed 64 bits, takes an extra byte when it 
unaligned.
+  static constexpr uint32_t kSize = 8 + 1;
+
+  Status checkLegalBound(uint32_t byte_offset, uint32_t bytes) const noexcept {
+    if (byte_offset < byte_offset_ || byte_offset_ + kSize < byte_offset + 
bytes) {
+      return {Status::NotOK, "The range [offset, offset + bytes) is out of 
bitfield."};
+    }
+    return Status::OK();
+  }
+
+  StatusOr<uint64_t> getBitfield(uint32_t bit_offset, uint8_t bits) const {
+    uint32_t first_byte = bit_offset / 8;
+    uint32_t last_byte = (bit_offset + bits - 1) / 8 + 1;
+    Status bound_status(checkLegalBound(first_byte, last_byte - first_byte));
+    if (!bound_status) {
+      return bound_status;
+    }
+
+    bit_offset -= byte_offset_ * 8;
+    uint64_t value = 0;
+    while (bits--) {
+      uint32_t byte = bit_offset >> 3;
+      uint32_t bit = 7 - (bit_offset & 0x7);
+      uint32_t byteval = buf_[byte];
+      uint32_t bitval = (byteval >> bit) & 1;
+      value = (value << 1) | bitval;
+      bit_offset++;
+    }
+    return value;
+  }
+
+  uint8_t buf_[kSize];
+  uint32_t byte_offset_;
+};
diff --git a/src/common/lock_manager.h b/src/common/lock_manager.h
index ee4e1742..782d6c82 100644
--- a/src/common/lock_manager.h
+++ b/src/common/lock_manager.h
@@ -92,10 +92,18 @@ class LockGuard {
   LockGuard(const LockGuard &) = delete;
   LockGuard &operator=(const LockGuard &) = delete;
 
-  LockGuard(LockGuard &&guard) : lock_(guard.lock_) { guard.lock_ = nullptr; }
+  LockGuard(LockGuard &&guard) noexcept : lock_(guard.lock_) { guard.lock_ = 
nullptr; }
+
+  LockGuard &operator=(LockGuard &&other) noexcept {
+    if (&other != this) {
+      std::destroy_at(this);
+      new (this) LockGuard(std::move(other));
+    }
+    return *this;
+  }
 
  private:
-  std::mutex *lock_;
+  std::mutex *lock_{nullptr};
 };
 
 class MultiLockGuard {
diff --git a/src/common/parse_util.h b/src/common/parse_util.h
index 63c03c00..3e3cf116 100644
--- a/src/common/parse_util.h
+++ b/src/common/parse_util.h
@@ -33,13 +33,18 @@ namespace details {
 template <typename>
 struct ParseIntFunc;
 
+template <>
+struct ParseIntFunc<char> {  // NOLINT
+  constexpr static const auto value = std::strtol;
+};
+
 template <>
 struct ParseIntFunc<short> {  // NOLINT
   constexpr static const auto value = std::strtol;
 };
 
 template <>
-struct ParseIntFunc<int> {
+struct ParseIntFunc<int> {  // NOLINT
   constexpr static const auto value = std::strtol;
 };
 
@@ -53,13 +58,18 @@ struct ParseIntFunc<long long> {  // NOLINT
   constexpr static const auto value = std::strtoll;
 };
 
+template <>
+struct ParseIntFunc<unsigned char> {  // NOLINT
+  constexpr static const auto value = std::strtoul;
+};
+
 template <>
 struct ParseIntFunc<unsigned short> {  // NOLINT
   constexpr static const auto value = std::strtoul;
 };
 
 template <>
-struct ParseIntFunc<unsigned> {
+struct ParseIntFunc<unsigned> {  // NOLINT
   constexpr static const auto value = std::strtoul;
 };
 
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index c66aca80..db7174d0 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -232,6 +232,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
               first_seen_ = false;
             }
             break;
+          case kRedisCmdBitfield:
+            command_args = {"BITFIELD", user_key};
+            command_args.insert(command_args.end(), args->begin() + 1, 
args->end());
+            break;
           default:
             LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Bitmap: 
unhandled command with code "
                        << *parsed_cmd;
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 48c40f67..cbdb7a41 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -61,6 +61,7 @@ enum RedisCommand {
   kRedisCmdExpire,
   kRedisCmdSetBit,
   kRedisCmdBitOp,
+  kRedisCmdBitfield,
   kRedisCmdLMove,
 };
 
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index a4c71cd2..b0fc65f6 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -38,6 +38,25 @@ const char kErrBitmapStringOutOfRange[] =
     "The size of the bitmap string exceeds the "
     "configuration item max-bitmap-to-string-mb";
 
+// Resize the segment to makes its new length at least min_bytes, new bytes 
will be set to 0.
+// min_bytes can not more than kBitmapSegmentBytes
+void ExpandBitmapSegment(std::string *segment, size_t min_bytes) {
+  assert(min_bytes <= kBitmapSegmentBytes);
+
+  auto old_size = segment->size();
+  if (min_bytes > old_size) {
+    size_t new_size = 0;
+    if (min_bytes > old_size * 2) {
+      new_size = min_bytes;
+    } else if (old_size * 2 > kBitmapSegmentBytes) {
+      new_size = kBitmapSegmentBytes;
+    } else {
+      new_size = old_size * 2;
+    }
+    segment->resize(new_size, 0);
+  }
+}
+
 rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata 
*metadata, std::string *raw_value) {
   std::string old_metadata;
   metadata->Encode(&old_metadata);
@@ -187,17 +206,7 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, 
uint32_t offset, bool new_
   uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
   uint64_t used_size = index + byte_index + 1;
   uint64_t bitmap_size = std::max(used_size, metadata.size);
-  if (byte_index >= value.size()) {  // expand the bitmap
-    size_t expand_size = 0;
-    if (byte_index >= value.size() * 2) {
-      expand_size = byte_index - value.size() + 1;
-    } else if (value.size() * 2 > kBitmapSegmentBytes) {
-      expand_size = kBitmapSegmentBytes - value.size();
-    } else {
-      expand_size = value.size();
-    }
-    value.append(expand_size, 0);
-  }
+  ExpandBitmapSegment(&value, byte_index + 1);
   uint32_t bit_offset = offset % 8;
   *old_bit = (value[byte_index] & (1 << bit_offset)) != 0;
   if (new_bit) {
@@ -539,6 +548,321 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const 
std::string &op_name, co
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+// SegmentCacheStore is used to read segments from storage.
+class Bitmap::SegmentCacheStore {
+ public:
+  SegmentCacheStore(engine::Storage *storage, rocksdb::ColumnFamilyHandle 
*metadata_cf_handle,
+                    std::string namespace_key, const Metadata &bitmap_metadata)
+      : storage_(storage),
+        metadata_cf_handle_(metadata_cf_handle),
+        ns_key_(std::move(namespace_key)),
+        metadata_(bitmap_metadata) {}
+
+  // Get a read-only segment by given index
+  rocksdb::Status Get(uint32_t index, const std::string **cache) {
+    std::string *res = nullptr;
+    auto s = get(index, /*set_dirty=*/false, &res);
+    if (s.ok()) {
+      *cache = res;
+    }
+    return s;
+  }
+
+  // Get a segment by given index, and mark it dirty.
+  rocksdb::Status GetMut(uint32_t index, std::string **cache) { return 
get(index, /*set_dirty=*/true, cache); }
+
+  // Add all dirty segments into write batch.
+  void BatchForFlush(ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch) {
+    uint64_t used_size = 0;
+    for (auto &[index, content] : cache_) {
+      if (content.first) {
+        std::string sub_key =
+            InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, 
storage_->IsSlotIdEncoded()).Encode();
+        batch->Put(sub_key, content.second);
+        used_size = std::max(used_size, static_cast<uint64_t>(index) * 
kBitmapSegmentBytes + content.second.size());
+      }
+    }
+    if (used_size > metadata_.size) {
+      metadata_.size = used_size;
+      std::string bytes;
+      metadata_.Encode(&bytes);
+      batch->Put(metadata_cf_handle_, ns_key_, bytes);
+    }
+  }
+
+ private:
+  rocksdb::Status get(uint32_t index, bool set_dirty, std::string **cache) {
+    auto [seg_itor, no_cache] = cache_.try_emplace(index);
+    auto &[is_dirty, str] = seg_itor->second;
+
+    if (no_cache) {
+      is_dirty = false;
+      std::string sub_key =
+          InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, 
storage_->IsSlotIdEncoded()).Encode();
+      rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key, &str);
+      if (!s.ok() && !s.IsNotFound()) {
+        return s;
+      }
+    }
+
+    is_dirty |= set_dirty;
+    *cache = &str;
+    return rocksdb::Status::OK();
+  }
+
+  static std::string getSegmentSubKey(uint32_t index) { return 
std::to_string(index * kBitmapSegmentBytes); }
+
+  engine::Storage *storage_;
+  rocksdb::ColumnFamilyHandle *metadata_cf_handle_;
+  std::string ns_key_;
+  Metadata metadata_;
+  // Segment index -> [is_dirty, segment_cache_string]
+  std::unordered_map<uint32_t, std::pair<bool, std::string>> cache_;
+};
+
+// Copy a range of bytes from entire bitmap and store them into 
ArrayBitfieldBitmap.
+static rocksdb::Status CopySegmentsBytesToBitfield(Bitmap::SegmentCacheStore 
&store, uint32_t byte_offset,
+                                                   uint32_t bytes, 
ArrayBitfieldBitmap *bitfield) {
+  bitfield->SetByteOffset(byte_offset);
+  bitfield->Reset();
+
+  uint32_t segment_index = byte_offset / kBitmapSegmentBytes;
+  int64_t remain_bytes = bytes;
+  // the byte_offset in current segment.
+  auto segment_byte_offset = static_cast<int>(byte_offset % 
kBitmapSegmentBytes);
+  for (; remain_bytes > 0; ++segment_index) {
+    const std::string *cache = nullptr;
+    auto cache_status = store.Get(segment_index, &cache);
+    if (!cache_status.ok()) {
+      return cache_status;
+    }
+
+    auto cache_size = static_cast<int>(cache->size());
+    auto copyable = std::max(0, cache_size - segment_byte_offset);
+    auto copy_count = std::min(static_cast<int>(remain_bytes), copyable);
+    auto src = reinterpret_cast<const uint8_t *>(cache->data() + 
segment_byte_offset);
+    auto status = bitfield->Set(byte_offset, copy_count, src);
+    if (!status) {
+      return rocksdb::Status::InvalidArgument();
+    }
+
+    // next segment will copy from its front.
+    byte_offset = (segment_index + 1) * kBitmapSegmentBytes;
+    // maybe negative, but still correct.
+    remain_bytes -= kBitmapSegmentBytes - segment_byte_offset;
+    segment_byte_offset = 0;
+  }
+
+  return rocksdb::Status::OK();
+}
+
+static rocksdb::Status GetBitfieldInteger(const ArrayBitfieldBitmap &bitfield, 
uint32_t bit_offset,
+                                          BitfieldEncoding enc, uint64_t *res) 
{
+  if (enc.IsSigned()) {
+    auto status = bitfield.GetSignedBitfield(bit_offset, enc.Bits());
+    if (!status) {
+      return rocksdb::Status::InvalidArgument();
+    }
+    *res = status.GetValue();
+  } else {
+    auto status = bitfield.GetUnsignedBitfield(bit_offset, enc.Bits());
+    if (!status) {
+      return rocksdb::Status::InvalidArgument();
+    }
+    *res = status.GetValue();
+  }
+  return rocksdb::Status::OK();
+}
+
+static rocksdb::Status CopyBitfieldBytesToSegments(Bitmap::SegmentCacheStore 
&store,
+                                                   const ArrayBitfieldBitmap 
&bitfield, uint32_t byte_offset,
+                                                   uint32_t bytes) {
+  uint32_t segment_index = byte_offset / kBitmapSegmentBytes;
+  auto segment_byte_offset = static_cast<int>(byte_offset % 
kBitmapSegmentBytes);
+  auto remain_bytes = static_cast<int32_t>(bytes);
+  for (; remain_bytes > 0; ++segment_index) {
+    std::string *cache = nullptr;
+    auto cache_status = store.GetMut(segment_index, &cache);
+    if (!cache_status.ok()) {
+      return cache_status;
+    }
+
+    auto copy_count = std::min(remain_bytes, 
static_cast<int32_t>(kBitmapSegmentBytes - segment_byte_offset));
+    if (static_cast<int>(cache->size()) < segment_byte_offset + copy_count) {
+      cache->resize(segment_byte_offset + copy_count);
+    }
+
+    auto dst = reinterpret_cast<uint8_t *>(cache->data()) + 
segment_byte_offset;
+    auto status = bitfield.Get(byte_offset, copy_count, dst);
+    if (!status) {
+      return rocksdb::Status::InvalidArgument();
+    }
+
+    // next segment will copy from its front.
+    byte_offset = (segment_index + 1) * kBitmapSegmentBytes;
+    // maybe negative, but still correct.
+    remain_bytes -= static_cast<int32_t>(kBitmapSegmentBytes - 
segment_byte_offset);
+    segment_byte_offset = 0;
+  }
+  return rocksdb::Status::OK();
+}
+
+template <bool ReadOnly>
+rocksdb::Status Bitmap::bitfield(const Slice &user_key, const 
std::vector<BitfieldOperation> &ops,
+                                 std::vector<std::optional<BitfieldValue>> 
*rets) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  std::optional<LockGuard> guard;
+  if constexpr (!ReadOnly) {
+    guard = LockGuard(storage_->GetLockManager(), ns_key);
+  }
+
+  BitmapMetadata metadata;
+  std::string raw_value;
+  auto s = GetMetadata(ns_key, &metadata, &raw_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (metadata.Type() == RedisType::kRedisString) {
+    if constexpr (ReadOnly) {
+      s = BitmapString::BitfieldReadOnly(ns_key, raw_value, ops, rets);
+    } else {
+      s = BitmapString(storage_, namespace_).Bitfield(ns_key, &raw_value, ops, 
rets);
+    }
+    return s;
+  }
+
+  if (metadata.Type() != RedisType::kRedisBitmap) {
+    return rocksdb::Status::InvalidArgument("The value is not a bitmap or 
string.");
+  }
+
+  // We firstly do the bitfield operation by fetching segments into memory.
+  // Use SegmentCacheStore to record dirty segments. (if not read-only mode)
+  SegmentCacheStore cache(storage_, metadata_cf_handle_, ns_key, metadata);
+  runBitfieldOperationsWithCache<ReadOnly>(cache, ops, rets);
+
+  if constexpr (!ReadOnly) {
+    // Write changes into storage.
+    auto batch = storage_->GetWriteBatchBase();
+    if (bitfieldWriteAheadLog(batch, ops)) {
+      cache.BatchForFlush(batch);
+      return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+    }
+  }
+  return rocksdb::Status::OK();
+}
+
+template <bool ReadOnly>
+rocksdb::Status Bitmap::runBitfieldOperationsWithCache(SegmentCacheStore 
&cache,
+                                                       const 
std::vector<BitfieldOperation> &ops,
+                                                       
std::vector<std::optional<BitfieldValue>> *rets) {
+  ArrayBitfieldBitmap bitfield;
+  for (BitfieldOperation op : ops) {
+    // found all bytes that contents the bitfield.
+    uint32_t first_byte = op.offset / 8;
+    uint32_t last_bytes = (op.offset + op.encoding.Bits() - 1) / 8 + 1;
+    uint32_t bytes = last_bytes - first_byte;
+
+    auto segment_status = CopySegmentsBytesToBitfield(cache, first_byte, 
bytes, &bitfield);
+    if (!segment_status.ok()) {
+      return segment_status;
+    }
+
+    // Covert the bitfield from a buffer to an integer.
+    uint64_t unsigned_old_value = 0;
+    auto s = GetBitfieldInteger(bitfield, op.offset, op.encoding, 
&unsigned_old_value);
+    if (!s.ok()) {
+      return s;
+    }
+
+    if constexpr (ReadOnly) {
+      rets->emplace_back() = {op.encoding, unsigned_old_value};
+      continue;
+    }
+
+    auto &ret = rets->emplace_back();
+    uint64_t unsigned_new_value = 0;
+    // BitfieldOp failed only when the length or bits illegal.
+    // BitfieldOperation already check above case in construction function.
+    if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value).GetValue()) {
+      if (op.type != BitfieldOperation::Type::kGet) {
+        Status _ = bitfield.SetBitfield(op.offset, op.encoding.Bits(), 
unsigned_new_value);
+        s = CopyBitfieldBytesToSegments(cache, bitfield, first_byte, bytes);
+        if (!s.ok()) {
+          return s;
+        }
+      }
+
+      if (op.type == BitfieldOperation::Type::kSet) {
+        unsigned_new_value = unsigned_old_value;
+      }
+
+      ret = {op.encoding, unsigned_new_value};
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+template rocksdb::Status Bitmap::bitfield<false>(const Slice &, const 
std::vector<BitfieldOperation> &,
+                                                 
std::vector<std::optional<BitfieldValue>> *);
+template rocksdb::Status Bitmap::bitfield<true>(const Slice &, const 
std::vector<BitfieldOperation> &,
+                                                
std::vector<std::optional<BitfieldValue>> *);
+
+// Return true if there are any write operation to bitmap. Otherwise return 
false.
+bool Bitmap::bitfieldWriteAheadLog(const 
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                   const std::vector<BitfieldOperation> &ops) {
+  std::vector<std::string> cmd_args{std::to_string(kRedisCmdBitfield)};
+  auto current_overflow = BitfieldOverflowBehavior::kWrap;
+  for (BitfieldOperation op : ops) {
+    if (op.type == BitfieldOperation::Type::kGet) {
+      continue;
+    }
+    if (current_overflow != op.overflow) {
+      current_overflow = op.overflow;
+      std::string overflow_str;
+      switch (op.overflow) {
+        case BitfieldOverflowBehavior::kWrap:
+          overflow_str = "WRAP";
+          break;
+        case BitfieldOverflowBehavior::kSat:
+          overflow_str = "SAT";
+          break;
+        case BitfieldOverflowBehavior::kFail:
+          overflow_str = "FAIL";
+          break;
+      }
+      cmd_args.emplace_back("OVERFLOW");
+      cmd_args.emplace_back(std::move(overflow_str));
+    }
+
+    if (op.type == BitfieldOperation::Type::kSet) {
+      cmd_args.emplace_back("SET");
+    } else {
+      cmd_args.emplace_back("INCRBY");
+    }
+    cmd_args.push_back(op.encoding.ToString());
+    cmd_args.push_back(std::to_string(op.offset));
+    if (op.type == BitfieldOperation::Type::kSet) {
+      if (op.encoding.IsSigned()) {
+        cmd_args.push_back(std::to_string(op.value));
+      } else {
+        cmd_args.push_back(std::to_string(static_cast<uint64_t>(op.value)));
+      }
+    } else {
+      cmd_args.push_back(std::to_string(op.value));
+    }
+  }
+
+  if (cmd_args.size() > 1) {
+    WriteBatchLogData log_data(kRedisBitmap, std::move(cmd_args));
+    batch->PutLogData(log_data.Encode());
+    return true;
+  }
+  return false;
+}
+
 bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t 
offset) {
   bool bit = false;
   uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
diff --git a/src/types/redis_bitmap.h b/src/types/redis_bitmap.h
index 7b12834f..a0ada45b 100644
--- a/src/types/redis_bitmap.h
+++ b/src/types/redis_bitmap.h
@@ -20,9 +20,11 @@
 
 #pragma once
 
+#include <optional>
 #include <string>
 #include <vector>
 
+#include "common/bitfield_util.h"
 #include "storage/redis_db.h"
 #include "storage/redis_metadata.h"
 
@@ -41,6 +43,8 @@ namespace redis {
 
 class Bitmap : public Database {
  public:
+  class SegmentCacheStore;
+
   Bitmap(engine::Storage *storage, const std::string &ns) : Database(storage, 
ns) {}
   rocksdb::Status GetBit(const Slice &user_key, uint32_t offset, bool *bit);
   rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, 
std::string *value);
@@ -49,11 +53,31 @@ class Bitmap : public Database {
   rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, 
int64_t stop, bool stop_given, int64_t *pos);
   rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const 
Slice &user_key,
                         const std::vector<Slice> &op_keys, int64_t *len);
+  rocksdb::Status Bitfield(const Slice &user_key, const 
std::vector<BitfieldOperation> &ops,
+                           std::vector<std::optional<BitfieldValue>> *rets) {
+    return bitfield<false>(user_key, ops, rets);
+  }
+  // read-only version for Bitfield(), if there is a write operation in ops, 
the function will return with failed
+  // status.
+  rocksdb::Status BitfieldReadOnly(const Slice &user_key, const 
std::vector<BitfieldOperation> &ops,
+                                   std::vector<std::optional<BitfieldValue>> 
*rets) {
+    return bitfield<true>(user_key, ops, rets);
+  }
   static bool GetBitFromValueAndOffset(const std::string &value, uint32_t 
offset);
   static bool IsEmptySegment(const Slice &segment);
 
  private:
+  template <bool ReadOnly>
+  rocksdb::Status bitfield(const Slice &user_key, const 
std::vector<BitfieldOperation> &ops,
+                           std::vector<std::optional<BitfieldValue>> *rets);
+  static bool bitfieldWriteAheadLog(const 
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                    const std::vector<BitfieldOperation> &ops);
   rocksdb::Status GetMetadata(const Slice &ns_key, BitmapMetadata *metadata, 
std::string *raw_value);
+
+  template <bool ReadOnly>
+  static rocksdb::Status runBitfieldOperationsWithCache(SegmentCacheStore 
&cache,
+                                                        const 
std::vector<BitfieldOperation> &ops,
+                                                        
std::vector<std::optional<BitfieldValue>> *rets);
 };
 
 }  // namespace redis
diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc
index 38a0c8f3..d9d77114 100644
--- a/src/types/redis_bitmap_string.cc
+++ b/src/types/redis_bitmap_string.cc
@@ -25,6 +25,7 @@
 #include <cstdint>
 
 #include "redis_string.h"
+#include "server/redis_reply.h"
 #include "storage/redis_metadata.h"
 #include "type_util.h"
 
@@ -204,4 +205,88 @@ int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t 
count, bool bit) {
   return res;
 }
 
+rocksdb::Status BitmapString::Bitfield(const Slice &ns_key, std::string 
*raw_value,
+                                       const std::vector<BitfieldOperation> 
&ops,
+                                       
std::vector<std::optional<BitfieldValue>> *rets) {
+  auto header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]);
+  std::string string_value = raw_value->substr(header_offset);
+  for (BitfieldOperation op : ops) {
+    // [first_byte, last_byte)
+    uint32_t first_byte = op.offset / 8;
+    uint32_t last_byte = (op.offset + op.encoding.Bits() - 1) / 8 + 1;
+
+    // expand string if need.
+    if (string_value.size() < last_byte) {
+      string_value.resize(last_byte);
+    }
+
+    ArrayBitfieldBitmap bitfield(first_byte);
+    auto str = reinterpret_cast<const uint8_t *>(string_value.data() + 
first_byte);
+    auto s = bitfield.Set(first_byte, last_byte - first_byte, str);
+
+    uint64_t unsigned_old_value = 0;
+    if (op.encoding.IsSigned()) {
+      unsigned_old_value = bitfield.GetSignedBitfield(op.offset, 
op.encoding.Bits()).GetValue();
+    } else {
+      unsigned_old_value = bitfield.GetUnsignedBitfield(op.offset, 
op.encoding.Bits()).GetValue();
+    }
+
+    uint64_t unsigned_new_value = 0;
+    auto &ret = rets->emplace_back();
+    if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value).GetValue()) {
+      if (op.type != BitfieldOperation::Type::kGet) {
+        // never failed.
+        s = bitfield.SetBitfield(op.offset, op.encoding.Bits(), 
unsigned_new_value);
+        auto dst = reinterpret_cast<uint8_t *>(string_value.data()) + 
first_byte;
+        s = bitfield.Get(first_byte, last_byte - first_byte, dst);
+      }
+
+      if (op.type == BitfieldOperation::Type::kSet) {
+        unsigned_new_value = unsigned_old_value;
+      }
+
+      ret = {op.encoding, unsigned_new_value};
+    }
+  }
+
+  raw_value->resize(header_offset);
+  raw_value->append(string_value);
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisString);
+  batch->PutLogData(log_data.Encode());
+  batch->Put(metadata_cf_handle_, ns_key, *raw_value);
+
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status BitmapString::BitfieldReadOnly(const Slice &ns_key, const 
std::string &raw_value,
+                                               const 
std::vector<BitfieldOperation> &ops,
+                                               
std::vector<std::optional<BitfieldValue>> *rets) {
+  std::string_view string_value = raw_value;
+  string_value = 
string_value.substr(Metadata::GetOffsetAfterExpire(string_value[0]));
+
+  for (BitfieldOperation op : ops) {
+    if (op.type != BitfieldOperation::Type::kGet) {
+      return rocksdb::Status::InvalidArgument("Write bitfield in read-only 
mode.");
+    }
+
+    uint32_t first_byte = op.offset / 8;
+    uint32_t last_byte = (op.offset + op.encoding.Bits() - 1) / 8 + 1;
+
+    ArrayBitfieldBitmap bitfield(first_byte);
+    auto s = bitfield.Set(first_byte, last_byte - first_byte,
+                          reinterpret_cast<const uint8_t 
*>(string_value.data() + first_byte));
+
+    if (op.encoding.IsSigned()) {
+      int64_t value = bitfield.GetSignedBitfield(op.offset, 
op.encoding.Bits()).GetValue();
+      rets->emplace_back(std::in_place, op.encoding, 
static_cast<uint64_t>(value));
+    } else {
+      uint64_t value = bitfield.GetUnsignedBitfield(op.offset, 
op.encoding.Bits()).GetValue();
+      rets->emplace_back(std::in_place, op.encoding, value);
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
 }  // namespace redis
diff --git a/src/types/redis_bitmap_string.h b/src/types/redis_bitmap_string.h
index 027c6615..ab61c211 100644
--- a/src/types/redis_bitmap_string.h
+++ b/src/types/redis_bitmap_string.h
@@ -20,9 +20,11 @@
 
 #pragma once
 
+#include <optional>
 #include <string>
 #include <vector>
 
+#include "common/bitfield_util.h"
 #include "storage/redis_db.h"
 #include "storage/redis_metadata.h"
 
@@ -36,6 +38,11 @@ class BitmapString : public Database {
   static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, 
int64_t stop, uint32_t *cnt);
   static rocksdb::Status BitPos(const std::string &raw_value, bool bit, 
int64_t start, int64_t stop, bool stop_given,
                                 int64_t *pos);
+  rocksdb::Status Bitfield(const Slice &ns_key, std::string *raw_value, const 
std::vector<BitfieldOperation> &ops,
+                           std::vector<std::optional<BitfieldValue>> *rets);
+  static rocksdb::Status BitfieldReadOnly(const Slice &ns_key, const 
std::string &raw_value,
+                                          const std::vector<BitfieldOperation> 
&ops,
+                                          
std::vector<std::optional<BitfieldValue>> *rets);
 
   static size_t RawPopcount(const uint8_t *p, int64_t count);
   static int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit);
diff --git a/tests/cppunit/bitfield_util.cc b/tests/cppunit/bitfield_util.cc
new file mode 100644
index 00000000..71976df4
--- /dev/null
+++ b/tests/cppunit/bitfield_util.cc
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "common/bitfield_util.h"
+
+#include <gtest/gtest.h>
+
+#include <cstring>
+#include <vector>
+
+#include "common/encoding.h"
+
+TEST(BitfieldUtil, Get) {
+  std::vector<uint8_t> big_endian_bitmap{0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 
0x00, 0xff};
+  std::vector<uint8_t> little_endian_bitmap(big_endian_bitmap);
+  std::reverse(little_endian_bitmap.begin(), little_endian_bitmap.end());
+
+  ArrayBitfieldBitmap bitfield(0);
+  auto s = bitfield.Set(0, big_endian_bitmap.size(), big_endian_bitmap.data());
+
+  for (int bits = 16; bits < 64; bits *= 2) {
+    for (uint64_t offset = 0; bits + offset <= big_endian_bitmap.size() * 8; 
offset += bits) {
+      uint64_t value = bitfield.GetUnsignedBitfield(offset, bits).GetValue();
+      if (IsBigEndian()) {
+        EXPECT_EQ(0, memcmp(&value, big_endian_bitmap.data(), bits / 8));
+      } else {
+        EXPECT_EQ(0, memcmp(&value, little_endian_bitmap.data(), bits / 8));
+      }
+    }
+  }
+}
diff --git a/tests/cppunit/types/bitmap_test.cc 
b/tests/cppunit/types/bitmap_test.cc
index 75f2d3f8..197f0ae9 100644
--- a/tests/cppunit/types/bitmap_test.cc
+++ b/tests/cppunit/types/bitmap_test.cc
@@ -24,16 +24,21 @@
 
 #include "test_base.h"
 #include "types/redis_bitmap.h"
+#include "types/redis_string.h"
 
 class RedisBitmapTest : public TestBase {
  protected:
-  explicit RedisBitmapTest() { bitmap_ = 
std::make_unique<redis::Bitmap>(storage_, "bitmap_ns"); }
+  explicit RedisBitmapTest() {
+    bitmap_ = std::make_unique<redis::Bitmap>(storage_, "bitmap_ns");
+    string_ = std::make_unique<redis::String>(storage_, "bitmap_ns");
+  }
   ~RedisBitmapTest() override = default;
 
   void SetUp() override { key_ = "test_bitmap_key"; }
   void TearDown() override {}
 
   std::unique_ptr<redis::Bitmap> bitmap_;
+  std::unique_ptr<redis::String> string_;
 };
 
 TEST_F(RedisBitmapTest, GetAndSetBit) {
@@ -89,3 +94,365 @@ TEST_F(RedisBitmapTest, BitPosSetBit) {
   }
   auto s = bitmap_->Del(key_);
 }
+
+TEST_F(RedisBitmapTest, BitfieldGetSetTest) {
+  constexpr uint32_t magic = 0xdeadbeef;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, 
32).GetValue();
+  op.offset = 114514;
+  op.value = magic;
+
+  EXPECT_TRUE(bitmap_->Bitfield(key_, {op}, &rets).ok());
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kGet;
+  auto _ = op.encoding.SetBitsCount(1);
+
+  // bitfield is stored in big-endian.
+  for (int i = 31; i != -1; --i) {
+    EXPECT_TRUE(bitmap_->Bitfield(key_, {op}, &rets).ok());
+    EXPECT_EQ((magic >> i) & 1, rets[0].value());
+    rets.clear();
+    op.offset++;
+  }
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, UnsignedBitfieldTest) {
+  constexpr uint8_t bits = 5;
+  static_assert(bits < 64);
+  constexpr uint64_t max = (uint64_t(1) << bits) - 1;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, 
bits).GetValue();
+  op.offset = 8189;  // the two bitmap segments divide the bitfield
+  op.value = 0;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  for (uint64_t i = 1; i <= max; ++i) {
+    op.value = int64_t(i);
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(i - 1, rets[0].value());
+    rets.clear();
+  }
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, SignedBitfieldTest) {
+  constexpr uint8_t bits = 10;
+  constexpr int64_t max = (uint64_t(1) << (bits - 1)) - 1;
+  constexpr int64_t min = -max - 1;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 
bits).GetValue();
+  op.offset = 8189;  // the two bitmap segments divide the bitfield
+  op.value = min;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  for (int64_t i = min + 1; i <= max; ++i) {
+    op.value = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(i - 1, rets[0].value());
+    rets.clear();
+  }
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, SignedBitfieldWrapSetTest) {
+  constexpr uint8_t bits = 6;
+  constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1;
+  constexpr int64_t min = -max - 1;
+  constexpr int64_t loopback = int64_t(1) << bits;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 
bits).GetValue();
+  op.offset = 0;
+  op.value = max;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kIncrBy;
+  op.value = 1;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(min, rets[0].value());
+  rets.clear();
+
+  op.value = loopback;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(min, rets[0].value());
+  rets.clear();
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, UnsignedBitfieldWrapSetTest) {
+  constexpr uint8_t bits = 6;
+  static_assert(bits < 64);
+  constexpr uint64_t max = (uint64_t(1) << bits) - 1;
+  constexpr int64_t loopback = int64_t(1) << bits;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, 
bits).GetValue();
+  op.offset = 0;
+  op.value = max;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kIncrBy;
+  op.value = 1;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.value = loopback;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, SignedBitfieldSatSetTest) {
+  constexpr uint8_t bits = 6;
+  constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 
bits).GetValue();
+  op.overflow = BitfieldOverflowBehavior::kSat;
+  op.offset = 0;
+  op.value = max * 2;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kGet;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(max, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kIncrBy;
+  for (int64_t i = 0; i <= max + 10; ++i) {
+    op.value = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(max, rets[0].value());
+    rets.clear();
+  }
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, UnsignedBitfieldSatSetTest) {
+  constexpr uint8_t bits = 6;
+  static_assert(bits < 64);
+  constexpr uint64_t max = (uint64_t(1) << bits) - 1;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, 
bits).GetValue();
+  op.overflow = BitfieldOverflowBehavior::kSat;
+  op.offset = 0;
+  op.value = max * 2;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kGet;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(max, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kIncrBy;
+  for (int64_t i = 0; i <= int64_t(max) + 10; ++i) {
+    op.value = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(max, rets[0].value());
+    rets.clear();
+  }
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, SignedBitfieldFailSetTest) {
+  constexpr uint8_t bits = 5;
+  constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 
bits).GetValue();
+  op.overflow = BitfieldOverflowBehavior::kFail;
+  op.offset = 0;
+  op.value = max * 2;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_FALSE(rets[0].has_value());
+  rets.clear();
+
+  op.value = max;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kIncrBy;
+  for (int64_t i = 1; i <= max; ++i) {
+    op.value = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_FALSE(rets[0].has_value());
+    rets.clear();
+  }
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, UnsignedBitfieldFailSetTest) {
+  constexpr uint8_t bits = 5;
+  constexpr int64_t max = (int64_t(1) << bits) - 1;
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kSet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, 
bits).GetValue();
+  op.overflow = BitfieldOverflowBehavior::kFail;
+  op.offset = 0;
+  op.value = max * 2;
+
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_FALSE(rets[0].has_value());
+  rets.clear();
+
+  op.value = max;
+  bitmap_->Bitfield(key_, {op}, &rets);
+  EXPECT_EQ(1, rets.size());
+  EXPECT_EQ(0, rets[0].value());
+  rets.clear();
+
+  op.type = BitfieldOperation::Type::kIncrBy;
+  for (int64_t i = 1; i <= max; ++i) {
+    op.value = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_FALSE(rets[0].has_value());
+    rets.clear();
+  }
+
+  auto s = bitmap_->Del(key_);
+}
+
+TEST_F(RedisBitmapTest, BitfieldStringGetSetTest) {
+  std::string str = "dan yuan ren chang jiu, qian li gong chan juan.";
+  string_->Set(key_, str);
+
+  std::vector<std::optional<BitfieldValue>> rets;
+
+  BitfieldOperation op;
+  op.type = BitfieldOperation::Type::kGet;
+  op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 
8).GetValue();
+
+  int i = 0;
+  for (auto ch : str) {
+    op.offset = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(ch, rets[0].value());
+    rets.clear();
+    i += 8;
+  }
+
+  for (; static_cast<size_t>(i) <= str.size() + 10; i += 8) {
+    op.offset = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(0, rets[0].value());
+    rets.clear();
+  }
+
+  // reverse all i8 in bitmap.
+  op.type = BitfieldOperation::Type::kSet;
+  for (int l = 0, r = static_cast<int>(str.size() - 1); l < r; ++l, --r) {
+    op.offset = l * 8;
+    op.value = str[r];
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(str[l], rets[0].value());
+    rets.clear();
+
+    op.offset = r * 8;
+    op.value = str[l];
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(str[r], rets[0].value());
+    rets.clear();
+  }
+  std::reverse(str.begin(), str.end());
+
+  // check reversed string.
+  i = 0;
+  op.type = BitfieldOperation::Type::kGet;
+  for (auto ch : str) {
+    op.offset = i;
+    bitmap_->Bitfield(key_, {op}, &rets);
+    EXPECT_EQ(1, rets.size());
+    EXPECT_EQ(ch, rets[0].value());
+    rets.clear();
+    i += 8;
+  }
+}
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index ed990ae3..b64a841f 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -912,6 +912,12 @@ func TestSlotMigrateDataType(t *testing.T) {
                for i := 10000; i < 11000; i += 2 {
                        require.NoError(t, rdb0.SetBit(ctx, keys[8], int64(i), 
1).Err())
                }
+               for i := 20000; i < 21000; i += 5 {
+                       res := rdb0.BitField(ctx, keys[8], "SET", "u5", 
strconv.Itoa(i), 23)
+                       require.NoError(t, res.Err())
+                       require.EqualValues(t, 1, len(res.Val()))
+                       require.EqualValues(t, 0, res.Val()[0])
+               }
                // 7. type sortint
                require.NoError(t, rdb0.Do(ctx, "SIADD", keys[9], 2, 4, 1, 
3).Err())
                require.NoError(t, rdb0.Do(ctx, "SIREM", keys[9], 2).Err())
@@ -957,6 +963,12 @@ func TestSlotMigrateDataType(t *testing.T) {
                for i := 0; i < 20; i += 2 {
                        require.EqualValues(t, 0, rdb1.GetBit(ctx, keys[8], 
int64(i)).Val())
                }
+               for i := 20000; i < 21000; i += 5 {
+                       res := rdb1.BitField(ctx, keys[8], "GET", "u5", 
strconv.Itoa(i))
+                       require.NoError(t, res.Err())
+                       require.EqualValues(t, 1, len(res.Val()))
+                       require.EqualValues(t, 23, res.Val()[0])
+               }
                // 7. type sortint
                require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys[9], 0, 
-1).Val())
 
diff --git a/tests/gocase/unit/type/bitmap/bitmap_test.go 
b/tests/gocase/unit/type/bitmap/bitmap_test.go
index 5b23389c..eb778622 100644
--- a/tests/gocase/unit/type/bitmap/bitmap_test.go
+++ b/tests/gocase/unit/type/bitmap/bitmap_test.go
@@ -302,4 +302,14 @@ func TestBitmap(t *testing.T) {
                Set2SetBit(t, rdb, ctx, "a", 
[]byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"))
                require.EqualValues(t, 32, rdb.BitOpOr(ctx, "x", "a", 
"b").Val())
        })
+
+       t.Run("BITFIELD on string type", func(t *testing.T) {
+               str := "zhe ge ren hen lan, shen me dou mei you liu xia."
+               require.NoError(t, rdb.Set(ctx, "str", str, 0).Err())
+               res := rdb.BitField(ctx, "str", "GET", "u8", "32", "SET", "u8", 
"32", 'r', "GET", "u8", "32")
+               require.NoError(t, res.Err())
+               require.EqualValues(t, str[4], res.Val()[0])
+               require.EqualValues(t, str[4], res.Val()[1])
+               require.EqualValues(t, 'r', res.Val()[2])
+       })
 }

Reply via email to