This is an automated email from the ASF dual-hosted git repository.

zhasheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aceafc  1bit gradient compression implementation (#17952)
5aceafc is described below

commit 5aceafc67d55a40d69abc446ca024c3196a55b5c
Author: Shuo Ouyang <[email protected]>
AuthorDate: Thu Mar 11 06:45:53 2021 +0800

    1bit gradient compression implementation (#17952)
    
    Co-authored-by: shuo-ouyang <[email protected]>
---
 ci/docker/runtime_functions.sh         |   6 +-
 python/mxnet/kvstore/kvstore.py        |   5 +-
 src/kvstore/gradient_compression-inl.h | 139 ++++++++++++++++++++++++----
 src/kvstore/gradient_compression.cc    |  80 +++++++++++++----
 src/kvstore/gradient_compression.cu    |  10 +++
 src/kvstore/gradient_compression.h     |   8 +-
 tests/nightly/dist_sync_kvstore.py     | 119 ++++++++++++++++++++++--
 tests/nightly/test_kvstore.py          | 159 +++++++++++++++++++++++++--------
 8 files changed, 439 insertions(+), 87 deletions(-)

diff --git a/ci/docker/runtime_functions.sh b/ci/docker/runtime_functions.sh
index fb9783d..1dc82a2 100755
--- a/ci/docker/runtime_functions.sh
+++ b/ci/docker/runtime_functions.sh
@@ -904,8 +904,10 @@ integrationtest_ubuntu_cpu_dist_kvstore() {
     python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=gluon_type_cpu
     python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py
     python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --no-multiprecision
-    python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=compressed_cpu
-    python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=compressed_cpu --no-multiprecision
+    python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=compressed_cpu_1bit
+    python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=compressed_cpu_1bit --no-multiprecision
+    python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=compressed_cpu_2bit
+    python3 ../../tools/launch.py -n 7 --launcher local python3 
dist_sync_kvstore.py --type=compressed_cpu_2bit --no-multiprecision
     python3 ../../tools/launch.py -n 3 --launcher local python3 
test_server_profiling.py
     popd
 }
diff --git a/python/mxnet/kvstore/kvstore.py b/python/mxnet/kvstore/kvstore.py
index ad83ad4..e37fab1 100644
--- a/python/mxnet/kvstore/kvstore.py
+++ b/python/mxnet/kvstore/kvstore.py
@@ -501,6 +501,9 @@ class KVStore(KVStoreBase):
         """ Specifies type of low-bit quantization for gradient compression \
          and additional arguments depending on the type of compression being 
used.
 
+        The 1bit compression works as follows: values which is above the 
threshold in the
+        gradient will be set to +1, whereas values below threshold will be set 
to -1.
+
         2bit Gradient Compression takes a positive float `threshold`.
         The technique works by thresholding values such that positive values 
in the
         gradient above threshold will be set to threshold. Negative values 
whose absolute
@@ -541,7 +544,7 @@ class KVStore(KVStoreBase):
             A dictionary specifying the type and parameters for gradient 
compression.
             The key `type` in this dictionary is a
             required string argument and specifies the type of gradient 
compression.
-            Currently `type` can be only `2bit`
+            Currently `type` can be only `1bit` and `2bit`
             Other keys in this dictionary are optional and specific to the type
             of gradient compression.
         """
diff --git a/src/kvstore/gradient_compression-inl.h 
b/src/kvstore/gradient_compression-inl.h
index 9b69bd1..7b906fd 100644
--- a/src/kvstore/gradient_compression-inl.h
+++ b/src/kvstore/gradient_compression-inl.h
@@ -32,13 +32,105 @@ namespace mxnet {
 namespace kvstore {
 
 // these gpu functions are defined in gradient_compression.cu
+void Quantize1BitImpl(mshadow::Stream<mshadow::gpu> *s, const 
std::vector<mxnet::TBlob> &inputs,
+                      const float threshold);
+void Dequantize1BitImpl(mshadow::Stream<mshadow::gpu> *s, const 
std::vector<mxnet::TBlob> &inputs,
+                        const float threshold);
 void Quantize2BitImpl(mshadow::Stream<mshadow::gpu> *s, const 
std::vector<mxnet::TBlob> &inputs,
                       const float threshold);
 void Dequantize2BitImpl(mshadow::Stream<mshadow::gpu> *s, const 
std::vector<mxnet::TBlob> &inputs,
                         const float threshold);
 
+struct quantize_1bit {
+  MSHADOW_XINLINE static void Map(int out_byte_id,
+                                  int original_size,
+                                  float *out,
+                                  float *grad,
+                                  float *residual,
+                                  const float threshold) {
+    // this byte contains the compressed representation of
+    // upto 8 values starting from (char*)out + out_byte_id
+    char *compr_byte = reinterpret_cast<char *>(out) + out_byte_id;
+
+    // init to 0
+    *compr_byte = 0;
+    // start and end are indices in original grad array
+    const int start = out_byte_id << 3;
+    const int end = (start + 8 <= original_size) ? start + 8 : original_size;
+
+    // masks used to quantize data
+    const uint8_t bits[] = {0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01};
+    for (int i = start; i < end; ++i) {
+      // adds gradient to existing residual to get updated grad
+      residual[i] += grad[i];
+      if (residual[i] > threshold) {
+        // set data to 1
+        *compr_byte |= bits[(i & 7)];
+        // reduce residual by 1
+        residual[i] -= 1;
+      } else {
+        // do nothing on compr_byte because it is initialized to 0
+        // add residual by 1
+        // because current position will be dequantized to -1
+        residual[i] += 1;
+      }
+    }
+  }
+};
+
+template<typename xpu>
+void Quantize1BitKernelLaunch(mshadow::Stream<xpu> *s, const 
std::vector<mxnet::TBlob> &inputs,
+                              const float threshold) {
+  mxnet::op::mxnet_op::Kernel<quantize_1bit, xpu>
+    ::Launch(s,
+            inputs[2].Size() * 4,         // compressed array byte size
+            inputs[0].Size(),             // original size
+            inputs[2].dptr<float>(),      // compressed array
+            inputs[0].dptr<float>(),      // original array
+            inputs[1].dptr<float>(),      // residual array
+            threshold);                   // threshold
+}
+
+struct dequantize_1bit {
+  MSHADOW_XINLINE static void Map(int i,
+                                  float *out,
+                                  float *in,
+                                  const float threshold) {
+    // get position of dequantized value to fill
+    float *outval = out + i;
+    // gets byte which holds quantized value for this position
+    char *ch_ptr = reinterpret_cast < char * > (in + (i >> 5));
+    ch_ptr += ((i & 31) >> 3);
+    // masks used to quantize data
+    const uint8_t bits[] = {0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01};
+    // col denotes which bit of a byte is set for this value
+    // col=0 implies the first bit, col=1 implies the second bit,...
+    const int col = i & 7;
+    const uint8_t mask = bits[col];
+    const uint8_t masked = *ch_ptr & mask;
+    if (masked == mask) {
+      *outval = +1;
+    } else {
+      // if current position of byte is 0
+      // dequantized it to -1
+      *outval = -1;
+    }
+  }
+};
+
+template<typename xpu>
+void Dequantize1BitKernelLaunch(mshadow::Stream<xpu> *s, const 
std::vector<mxnet::TBlob> &inputs,
+                                const float threshold) {
+  mxnet::op::mxnet_op::Kernel<dequantize_1bit, xpu>
+  ::Launch(s,
+          inputs[1].Size(),         // original size
+          inputs[1].dptr<float>(),  // out array
+          inputs[0].dptr<float>(),  // compressed array
+          threshold);               // threshold
+}
+
 struct quantize_2bit {
-  MSHADOW_XINLINE static void Map(int out_block_id,
+  MSHADOW_XINLINE static void Map(int out_byte_id,
                                   int original_size,
                                   float *out,
                                   float *grad,
@@ -46,15 +138,14 @@ struct quantize_2bit {
                                   const float neg_threshold,
                                   const float pos_threshold) {
     // this block contains the compressed representation of
-    // upto 16 values starting from out_block_id*16
-    float *compr_block = out + out_block_id;
+    // upto 4 values starting from (char*)out + out_byte_id
+    char *compr_byte = reinterpret_cast<char *>(out) + out_byte_id;
     // init to 0
-    *compr_block = 0;
+    *compr_byte = 0;
     // start and end are indices in original grad array
-    const int start = out_block_id << 4;
-    const int end = (start + 16 <= original_size) ? start + 16 : original_size;
-    // cast as char* to manipulate bits of float addresses
-    char *block_ptr = reinterpret_cast < char * > (compr_block);
+    const int start = out_byte_id << 2;
+    const int end = (start + 4 <= original_size) ? start + 4 : original_size;
+
     // masks to set bits when value meets pos_threshold
     // 0xc0 is mask when value is to be represented by the first two bits in a 
char*
     // 0xc0 means first two bits are set to 11
@@ -62,18 +153,16 @@ struct quantize_2bit {
     // masks to set bits when value meets neg_threshold
     const uint8_t negbits[] = {0x80, 0x20, 0x08, 0x02};
     for (int i = start; i < end; i++) {
-      // adds offset to reach appropriate byte
-      char *curr_byte = block_ptr + ((i - start) >> 2);
       // adds gradient to existing residual to get updated grad
       residual[i] += grad[i];
       if (residual[i] >= pos_threshold) {
         // set data to 11
-        *curr_byte |= posbits[(i & 3)];
+        *compr_byte |= posbits[(i & 3)];
         // reduce residual by pos_threshold
         residual[i] -= pos_threshold;
       } else if (residual[i] <= neg_threshold) {
         // set data to 10
-        *curr_byte |= negbits[(i & 3)];
+        *compr_byte |= negbits[(i & 3)];
         residual[i] -= neg_threshold;
       }
     }
@@ -85,13 +174,13 @@ void Quantize2BitKernelLaunch(mshadow::Stream<xpu> *s, 
const std::vector<mxnet::
                               const float threshold) {
   mxnet::op::mxnet_op::Kernel<quantize_2bit, xpu>
     ::Launch(s,
-            inputs[2].Size(),         // compressed array size
-            inputs[0].Size(),         // original size
-            inputs[2].dptr<float>(),  // compressed array
-            inputs[0].dptr<float>(),  // original array
-            inputs[1].dptr<float>(),  // residual array
-            -1 *threshold,            // negative threshold
-            threshold);               // positive threshold
+            inputs[2].Size() * 4,         // compressed array byte size
+            inputs[0].Size(),             // original size
+            inputs[2].dptr<float>(),      // compressed array
+            inputs[0].dptr<float>(),      // original array
+            inputs[1].dptr<float>(),      // residual array
+            -1 *threshold,                // negative threshold
+            threshold);                   // positive threshold
 }
 
 struct dequantize_2bit {
@@ -138,6 +227,18 @@ void Dequantize2BitKernelLaunch(mshadow::Stream<xpu> *s, 
const std::vector<mxnet
           threshold);               // positive threshold
 }
 
+inline void Quantize1BitImpl(mshadow::Stream<mshadow::cpu> *s,
+                             const std::vector<mxnet::TBlob> &inputs,
+                             const float threshold) {
+  Quantize1BitKernelLaunch(s, inputs, threshold);
+}
+
+inline void Dequantize1BitImpl(mshadow::Stream<mshadow::cpu> *s,
+                               const std::vector<mxnet::TBlob> &inputs,
+                               const float threshold) {
+  Dequantize1BitKernelLaunch(s, inputs, threshold);
+}
+
 inline void Quantize2BitImpl(mshadow::Stream<mshadow::cpu> *s,
                              const std::vector<mxnet::TBlob> &inputs,
                              const float threshold) {
diff --git a/src/kvstore/gradient_compression.cc 
b/src/kvstore/gradient_compression.cc
index 30aaec9..dbfc495 100644
--- a/src/kvstore/gradient_compression.cc
+++ b/src/kvstore/gradient_compression.cc
@@ -41,8 +41,10 @@ void GradientCompression::SetParams(const 
std::vector<std::pair<std::string, std
                                     & kwargs) {
   GradientCompressionParam params;
   params.InitAllowUnknown(kwargs);
-  CHECK_GT(params.threshold, 0) << "threshold must be greater than 0";
-  if (params.type == "2bit") {
+  if (params.type == "1bit") {
+    SetOneBitCompression(params.threshold);
+  } else if (params.type == "2bit") {
+    CHECK_GT(params.threshold, 0) << "threshold must be greater than 0 for two 
bit compression";
     SetTwoBitCompression(params.threshold);
   } else {
     LOG(FATAL) << "Unknown type for gradient compression " << params.type;
@@ -57,6 +59,11 @@ std::string GradientCompression::get_type_str() {
   return std::to_string(static_cast<int>(type_));
 }
 
+void GradientCompression::SetOneBitCompression(const float threshold) {
+  type_ = CompressionType::kOneBit;
+  threshold_ = threshold;
+}
+
 void GradientCompression::SetTwoBitCompression(const float threshold) {
   type_ = CompressionType::kTwoBit;
   threshold_ = threshold;
@@ -83,7 +90,9 @@ void GradientCompression::DecodeParams(const std::string &s) {
 }
 
 int GradientCompression::GetCompressionFactor() {
-  if (type_ == CompressionType::kTwoBit) {
+  if (type_ == CompressionType::kOneBit) {
+    return 32;
+  } else if (type_ == CompressionType::kTwoBit) {
     return 16;
   } else {
     LOG(FATAL) << "Unsupported compression type: " << get_type_str();
@@ -106,16 +115,34 @@ void GradientCompression::Quantize(const mxnet::NDArray 
&from, mxnet::NDArray *t
   const int a = from.ctx().dev_mask();
   const int b = to->ctx().dev_mask();
   const float threshold = threshold_;
-  if (type_ == CompressionType::kTwoBit) {
-    if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+  if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+    if (type_ == CompressionType::kOneBit) {
+      mxnet::Engine::Get()->PushSync([from, to, residual, 
threshold](mxnet::RunContext ctx) {
+        std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(), 
to->data()};
+        Quantize1BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
+      }, from.ctx(), {from.var()}, {to->var(), residual->var()},
+      mxnet::FnProperty::kNormal, priority, "QuantizeCPU");
+    } else if (type_ == CompressionType::kTwoBit) {
       mxnet::Engine::Get()->PushSync([from, to, residual, 
threshold](mxnet::RunContext ctx) {
         std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(), 
to->data()};
         Quantize2BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
       }, from.ctx(), {from.var()}, {to->var(), residual->var()},
       mxnet::FnProperty::kNormal, priority, "QuantizeCPU");
     } else {
+      LOG(FATAL) << "Unsupported quantization of type " << get_type_str();
+    }
+  } else {
+    if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
 #if MXNET_USE_CUDA
-      if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
+      if (type_ == CompressionType::kOneBit) {
+        mxnet::Engine::Get()->PushSync([from, to, residual, 
threshold](mxnet::RunContext ctx) {
+          std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(), 
to->data()};
+          Quantize1BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, threshold);
+          // Wait GPU kernel to complete
+          ctx.get_stream<mshadow::gpu>()->Wait();
+        }, from.ctx(), {from.var()}, {to->var(), residual->var()},
+        mxnet::FnProperty::kNormal, priority, "QuantizeGPU");
+      } else if (type_ == CompressionType::kTwoBit) {
         mxnet::Engine::Get()->PushSync([from, to, residual, 
threshold](mxnet::RunContext ctx) {
           std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(), 
to->data()};
           Quantize2BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, threshold);
@@ -124,14 +151,14 @@ void GradientCompression::Quantize(const mxnet::NDArray 
&from, mxnet::NDArray *t
         }, from.ctx(), {from.var()}, {to->var(), residual->var()},
         mxnet::FnProperty::kNormal, priority, "QuantizeGPU");
       } else {
-        LOG(FATAL) << "unknown device mask";
+        LOG(FATAL) << "Unsupported quantization of type " << get_type_str();
       }
 #else
     LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
 #endif
+    } else {
+      LOG(FATAL) << "Unknown device mask, from device mask " << a << " to 
device mask " << b;
     }
-  } else {
-    LOG(FATAL) << "Unsupported quantization of type " << get_type_str();
   }
 }
 
@@ -142,35 +169,52 @@ void GradientCompression::Dequantize(const mxnet::NDArray 
&from, mxnet::NDArray
   const int a = from.ctx().dev_mask();
   const int b = to->ctx().dev_mask();
   const float threshold = threshold_;
-  if (type_ == CompressionType::kTwoBit) {
-    if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+  if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+    if (type_ == CompressionType::kOneBit) {
+      mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext 
ctx) {
+        std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
+        Dequantize1BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
+      }, from.ctx(), {from.var()}, {to->var()},
+      mxnet::FnProperty::kNormal, priority, "DequantizeCPU");
+    } else if (type_ == CompressionType::kTwoBit) {
       mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext 
ctx) {
         std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
         Dequantize2BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
       }, from.ctx(), {from.var()}, {to->var()},
       mxnet::FnProperty::kNormal, priority, "DequantizeCPU");
     } else {
+      LOG(FATAL) << "Unsupported dequantization of type " << get_type_str();
+    }
+  } else {
+    if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
 #if MXNET_USE_CUDA
-      if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
+      if (type_ == CompressionType::kOneBit) {
         mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext 
ctx) {
           std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
-          Dequantize2BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, 
threshold);
+          Dequantize1BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, 
threshold);
           // Wait GPU kernel to complete
           ctx.get_stream<mshadow::gpu>()->Wait();
         }, from.ctx(), {from.var()}, {to->var()},
         mxnet::FnProperty::kNormal, priority, "DequantizeGPU");
+      } else if (type_ == CompressionType::kTwoBit) {
+        mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext 
ctx) {
+          std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
+          Dequantize2BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, 
threshold);
+          // Wait GPU kernel to completes
+          ctx.get_stream<mshadow::gpu>()->Wait();
+        }, from.ctx(), {from.var()}, {to->var()},
+        mxnet::FnProperty::kNormal, priority, "DequantizeGPU");
       } else {
-        LOG(FATAL) << "unknown device mask";
+        LOG(FATAL) << "Unsupported dequantization of type " << get_type_str();
       }
 #else
-      LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
+    LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
 #endif
+    } else {
+      LOG(FATAL) << "Unknown device mask, from device mask " << a << " to 
device mask " << b;
     }
-  } else {
-    LOG(FATAL) << "Unsupported dequantization of type " << get_type_str();
   }
 }
-
 }  // namespace kvstore
 }  // namespace mxnet
 
diff --git a/src/kvstore/gradient_compression.cu 
b/src/kvstore/gradient_compression.cu
index b0d9662..c5bacc2 100644
--- a/src/kvstore/gradient_compression.cu
+++ b/src/kvstore/gradient_compression.cu
@@ -27,6 +27,16 @@
 
 namespace mxnet {
 namespace kvstore {
+void Quantize1BitImpl(mshadow::Stream<gpu>* s, const std::vector<TBlob>& 
inputs,
+                      const float threshold) {
+  Quantize1BitKernelLaunch(s, inputs, threshold);
+}
+
+void Dequantize1BitImpl(mshadow::Stream<gpu>* s, const std::vector<TBlob>& 
inputs,
+                        const float threshold) {
+  Dequantize1BitKernelLaunch(s, inputs, threshold);
+}
+
 void Quantize2BitImpl(mshadow::Stream<gpu>* s, const std::vector<TBlob>& 
inputs,
                       const float threshold) {
   Quantize2BitKernelLaunch(s, inputs, threshold);
diff --git a/src/kvstore/gradient_compression.h 
b/src/kvstore/gradient_compression.h
index f40b45f..5496ada 100644
--- a/src/kvstore/gradient_compression.h
+++ b/src/kvstore/gradient_compression.h
@@ -35,7 +35,7 @@ namespace mxnet {
 namespace kvstore {
 
 enum class CompressionType {
-  kNone, kTwoBit
+  kNone, kOneBit, kTwoBit
 };
 
 struct GradientCompressionParam : public 
dmlc::Parameter<GradientCompressionParam> {
@@ -73,6 +73,12 @@ class GradientCompression {
   std::string get_type_str();
 
   /*!
+   * \biref sets one bit gradient compression
+   * \param threshold float value used for thresholding gradients
+   */
+  void SetOneBitCompression(const float threshold);
+
+  /*!
    * \brief sets two bit gradient compression
    * \param threshold float value used for thresholding gradients
    */
diff --git a/tests/nightly/dist_sync_kvstore.py 
b/tests/nightly/dist_sync_kvstore.py
index 3f5137b..1a89f5c 100644
--- a/tests/nightly/dist_sync_kvstore.py
+++ b/tests/nightly/dist_sync_kvstore.py
@@ -25,13 +25,13 @@ import mxnet as mx
 import numpy as np
 import numpy.random as rnd
 from mxnet.test_utils import assert_almost_equal, assert_exception
-from test_kvstore import compute_expected_2bit_quantization
+from test_kvstore import compute_expected_quantization, compute_1bit, 
compute_2bit
 
 def check_diff(A, x, rank=None):
     """ assert A == x
         x can be scalar as well as numpy array
     """
-    assert (np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), 
x.asnumpy())
+    assert (np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)
 
 # setup
 shape = (2, 3)
@@ -88,9 +88,8 @@ def set_optimizer(use_multiprecision):
     kv.set_optimizer(mx.optimizer.create('test', rescale_grad=rate, 
multi_precision=use_multiprecision))
     return kv
 
-def init_kv_compressed(kv):
-    threshold = 0.5
-    kv.set_gradient_compression({'type': '2bit', 'threshold': threshold})
+def init_kv_compressed(kv, compression='2bit', threshold=.5):
+    kv.set_gradient_compression({'type': compression, 'threshold': threshold})
     # init kv compression keys
     for k, s in compr_keys_shapes:
         kv.init(k, mx.nd.zeros(s))
@@ -230,6 +229,104 @@ def test_sync_push_pull(nrepeat):
         check_big_row_sparse_keys(dtype, nrepeat)
     print('worker ' + str(my_rank) + ' is done with non compression tests')
 
+def test_sync_1bit_compression(threshold, nrepeat):
+
+    def check_compr_pull_before_push():
+        for k, s in compr_keys_shapes:
+            val = mx.nd.ones(s)
+            kv.pull(k, val)
+            check_diff(val, 0)
+        for k, s in compr_init_keys_shapes:
+            # tests that GC is not used for init of a key
+            val = mx.nd.zeros(s)
+            kv.pull(k, val)
+            check_diff(val, 1)
+
+    def check_compr_ones():
+        for k, s in compr_keys_shapes:
+            val = mx.nd.zeros(s)
+            kv.pull(k, val)
+            curr_val = val[0][0].asnumpy()[0]
+            kv.push(k, mx.nd.ones(s))
+            out = mx.nd.zeros(s)
+            kv.pull(k, out=out)
+            newval = curr_val + rate * nworker
+            check_diff(out, newval)
+    
+    def check_compr_neg_ones():
+        for k, s in compr_keys_shapes:
+            val = mx.nd.zeros(s)
+            kv.pull(k, val)
+            curr_val = val[0][0].asnumpy()[0]
+            kv.push(k, -1 * mx.nd.ones(s))
+            out = mx.nd.ones(s)
+            kv.pull(k, out=out)
+            # current value should be zero after call
+            # check_compr_ones and check_compr_neg_ones
+            check_diff(out, 0)
+    
+    def check_compr_residual(threshold):
+        curr_residual = 0
+        curr_val = rate * nworker if 2 + curr_residual > threshold else -rate 
* nworker
+        for k, s in compr_keys_shapes:
+            kv.push(k, 2 * mx.nd.ones(s))
+            out = mx.nd.zeros(s)
+            kv.pull(k, out)
+            check_diff(out, curr_val)
+
+        curr_residual = 1 if 2 > threshold else 3
+        curr_val += rate * nworker if 0 + curr_residual > threshold else -rate 
* nworker
+        for k, s in compr_keys_shapes:
+            kv.push(k, mx.nd.zeros(s))
+            out = mx.nd.zeros(s)
+            kv.pull(k, out)
+            check_diff(out, curr_val)
+
+        curr_residual += -1 if curr_residual > threshold else +1
+        curr_val += rate * nworker if -2 + curr_residual > threshold else 
-rate * nworker
+        for k, s in compr_keys_shapes:
+            kv.push(k, -2 * mx.nd.ones(s))
+            out = mx.nd.zeros(s)
+            kv.pull(k, out)
+            check_diff(out, curr_val)        
+
+    def check_compr_random(threshold, nrepeat):
+        # set a seed so all workers generate same data. knowing this helps
+        # calculate expected value after pull
+        mx.random.seed(123)
+        rnd.seed(123)
+
+        # use new keys so residual is 0 for calculation of expected
+        for k,s in compr_random_keys_shapes:
+            kv.init(k, mx.nd.zeros(s))
+        for k,s in compr_random_keys_shapes:
+            curr_residual = np.zeros(s)
+            for l in range(nrepeat):
+                orig_val = mx.nd.zeros(s)
+                kv.pull(k, orig_val)
+
+                grad = mx.nd.array(rnd.rand(s[0], s[1]))
+                # creates a copy because push changes grad because of 
assignment
+                grad_cpy = mx.nd.array(grad)
+                kv.push(k, grad)
+                val = mx.nd.zeros(s)
+                kv.pull(k, val)
+
+                diff = val - orig_val
+
+                # compute expected by using simulation of operator
+                compr, curr_residual, decompr = 
compute_expected_quantization(grad_cpy, curr_residual, threshold, compute_1bit)
+                decompr *= nworker * rate
+                assert_almost_equal(diff.asnumpy(), decompr)
+
+    print ('worker ' + str(my_rank) + ' started with 1bit compression tests')
+    check_compr_pull_before_push()
+    check_compr_ones()
+    check_compr_neg_ones()
+    check_compr_residual(threshold)
+    check_compr_random(threshold, nrepeat)
+    print('worker ' + str(my_rank) + ' is done with 1bit compression tests')   
+
 def test_sync_2bit_compression(threshold, nrepeat):
     def check_compr_residual(threshold):
         for k, s in compr_keys_shapes:
@@ -316,17 +413,17 @@ def test_sync_2bit_compression(threshold, nrepeat):
                 diff = val - orig_val
 
                 # compute expected by using simulation of operator
-                compr, curr_residual, decompr = 
compute_expected_2bit_quantization(grad_cpy, curr_residual, threshold)
+                compr, curr_residual, decompr = 
compute_expected_quantization(grad_cpy, curr_residual, threshold, compute_2bit)
                 decompr *= nworker * rate
                 assert_almost_equal(diff.asnumpy(), decompr)
 
-    print ('worker ' + str(my_rank) + ' started with compression tests')
+    print ('worker ' + str(my_rank) + ' started with 2bit compression tests')
     check_compr_pull_before_push()
     check_compr_zero()
     check_compr_residual(threshold)
     check_compr_ones(threshold)
     check_compr_random(threshold, nrepeat)
-    print('worker ' + str(my_rank) + ' is done with compression tests')
+    print('worker ' + str(my_rank) + ' is done with 2bit compression tests')
 
 def test_sync_init(gpu_tests=False):
     def get_dtype(idx, cur_keys):
@@ -455,7 +552,11 @@ if __name__ == "__main__":
         kv = init_kv()
         kv = set_optimizer(use_multiprecision=opt.multiprecision)
         test_sync_push_pull(opt.nrepeat)
-    elif opt.type == 'compressed_cpu':
+    elif opt.type == 'compressed_cpu_1bit':
+        kv, threshold = init_kv_compressed(kv, '1bit', 0)
+        kv = set_optimizer(use_multiprecision=opt.multiprecision)
+        test_sync_1bit_compression(threshold, opt.nrepeat)
+    elif opt.type == 'compressed_cpu_2bit':
         kv, threshold = init_kv_compressed(kv)
         kv = set_optimizer(use_multiprecision=opt.multiprecision)
         test_sync_2bit_compression(threshold, opt.nrepeat)
diff --git a/tests/nightly/test_kvstore.py b/tests/nightly/test_kvstore.py
index ced3ee1..65f6c4a 100755
--- a/tests/nightly/test_kvstore.py
+++ b/tests/nightly/test_kvstore.py
@@ -27,53 +27,73 @@ import copy
 
 from mxnet.test_utils import assert_almost_equal
 
+
 def check_diff_to_scalar(A, x, rank=None):
     """ assert A == x"""
     assert(np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)
 
-def compute_expected_2bit_quantization(arr, curr_residual, threshold):
-    from struct import pack,unpack
-    def bits2int(bits):
-        bits = [int(x) for x in bits[::-1]]
-        x = 0
-        for i in range(len(bits)):
-            x += bits[i]*2**i
-        return x
+def compute_1bit(arr, curr_residual, threshold):
+    str_quant = ""
+    new_residual = []
+    decompr = []
 
-    def as_float32(s):
-        return unpack("f",pack("I", bits2int(s)))[0]
+    for idx, val in np.ndenumerate(arr):
+        val += curr_residual[idx]
+        if val > threshold:
+            str_quant += "1"
+            new_residual.append(val - 1)
+            decompr.append(1)
+        else:
+            str_quant += "0"
+            new_residual.append(val + 1)
+            decompr.append(-1)
 
-    # str_quant stores the quantized representation as a sequence of bits
-    str_quant = ''
+    # append extra bits when size of array not a factor of 32
+    if len(str_quant) != 32:
+        str_quant += "0" * (32 - len(str_quant) % 32)
+    return str_quant, new_residual, decompr
+
+def compute_2bit(arr, curr_residual, threshold):
+    str_quant = ""
     new_residual = []
     decompr = []
 
-    arr_npy = arr.asnumpy()
-    for i, a in np.ndenumerate(arr_npy):
-        a += curr_residual[i]
-        if a >= threshold:
-            str_quant += '11'
-            new_residual.append(a - threshold)
+    for idx, val in np.ndenumerate(arr):
+        val += curr_residual[idx]
+        if val >= threshold:
+            str_quant += "11"
+            new_residual.append(val - threshold)
             decompr.append(threshold)
-        elif a <= (-1*threshold):
-            str_quant += '10'
-            new_residual.append(a + threshold)
-            decompr.append(-1*threshold)
+        elif val <= -threshold:
+            str_quant += "10"
+            new_residual.append(val + threshold)
+            decompr.append(-threshold)
         else:
-            str_quant += '00'
-            new_residual.append(a)
+            str_quant += "00"
+            new_residual.append(val)
             decompr.append(0)
+
     # append extra bits when size of array not a factor of 16
-    if len(str_quant)%16 != 0:
-        str_quant += '0'*(16 - len(str_quant)%16)
+    if len(str_quant) % 16 != 0:
+        str_quant += "0" * (16 - len(str_quant) % 16)
+    return str_quant, new_residual, decompr
 
+def compute_expected_quantization(arr, curr_residual, threshold, 
quantize_func):
+
+    from struct import pack,unpack
+    def as_float32(s):
+        return unpack("f",pack("I", int(s, 2)))[0]
+
+    arr_npy = arr.asnumpy()
+    # str_quant stores the quantized representation as a sequence of bits
+    str_quant, new_residual, decompr = quantize_func(arr_npy, curr_residual, 
threshold)
+    
     compr = []
     # converts the string generated into integers 32chars at a time
-    i = 0
-    while i<len(str_quant):
+    for i in range(0, len(str_quant), 32):
         cur_float = str_quant[i+24:i+32] + str_quant[i+16:i+24] + 
str_quant[i+8:i+16] + str_quant[i:i+8]
         compr.append(as_float32(cur_float))
-        i+=32
+
     return np.array(compr), np.array(new_residual).reshape(arr.shape), 
np.array(decompr).reshape(arr.shape)
 
 ## individual key interface
@@ -99,8 +119,15 @@ def test_kvstore(kv_type, stype):
             assert(err < 1e-6), (err, shapes[j])
 
 def test_compress_kvstore(kv_type, compression='2bit', threshold=0.5):
-    print(kv_type + ' with ' + compression + ' compression')
+    print(kv_type + ' with ' + compression + ' compression and threshold is ' 
+ str(threshold))
     rate = 2
+    quantize_func = None
+    if compression == '1bit':
+        quantize_func = compute_1bit
+    elif compression == '2bit':
+        quantize_func = compute_2bit
+    else:
+        raise RuntimeError("Unknown gradient compression type!")
     kv = mx.kv.create(kv_type)
     kv.set_gradient_compression({'type':compression, 'threshold':threshold})
     kv.set_optimizer(mx.optimizer.create('test', rescale_grad=rate))
@@ -131,6 +158,56 @@ def test_compress_kvstore(kv_type, compression='2bit', 
threshold=0.5):
                 for o in out:
                     assert_almost_equal(o.asnumpy(), exp)
 
+    def push_ones(kv, sign=1):
+        for i in range(nrepeat):
+            for j in range(len(keys)):
+                kv.push(keys[j], [sign * mx.nd.ones(shapes[j], mx.gpu(g)) for 
g in range(nworker)])
+                out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in 
range(nworker)]
+                kv.pull(keys[j], out=out)
+                if sign == 1:
+                    exp = (i + 1) * rate * nworker * 
np.ones_like(out[0].asnumpy())
+                else:
+                    exp = (nrepeat - i - 1) * rate * nworker * 
np.ones_like(out[0].asnumpy())
+                for o in out:
+                    assert_almost_equal(o.asnumpy(), exp)
+
+    def verify_residual_1bit(kv, threshold, rate):
+        # current values must equal to zero
+        for j in range(len(keys)):
+            out = [mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)]
+            kv.pull(keys[j], out=out)
+            exp = np.zeros_like(out[0].asnumpy())
+            for o in out:
+                assert_almost_equal(o.asnumpy(), exp)
+        
+        curr_residual = 0
+        curr_val = rate * nworker if 2 > threshold else -rate * nworker
+        for j in range(len(keys)):
+            kv.push(keys[j], [2 * mx.nd.ones(shapes[j], mx.gpu(g)) for g in 
range(nworker)])
+            out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
+            kv.pull(keys[j], out=out)
+            
+            for o in out:
+                check_diff_to_scalar(o, curr_val)
+
+        curr_residual = 1 if 2 > threshold else 3
+        curr_val += rate * nworker if 0 + curr_residual > threshold else -rate 
* nworker
+        for j in range(len(keys)):
+            kv.push(keys[j], [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in 
range(nworker)])
+            out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
+            kv.pull(keys[j], out=out)
+            for o in out:
+                check_diff_to_scalar(o, curr_val)
+
+        curr_residual += -1 if curr_residual > threshold else +1
+        curr_val += rate * nworker if -2 + curr_residual > threshold else 
-rate * nworker
+        for j in range(len(keys)):
+            kv.push(keys[j], [-2 * mx.nd.ones(shapes[j], mx.gpu(g)) for g in 
range(nworker)])
+            out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
+            kv.pull(keys[j], out=out)
+            for o in out:
+                check_diff_to_scalar(o, curr_val)
+    
     def push_zeros(kv):
         for i in range(nrepeat):
             for j in range(len(keys)):
@@ -141,7 +218,7 @@ def test_compress_kvstore(kv_type, compression='2bit', 
threshold=0.5):
                 for o in out:
                     assert_almost_equal(o.asnumpy(), exp)
 
-    def verify_residual(kv, threshold, rate):
+    def verify_residual_2bit(kv, threshold, rate):
         for j in range(len(keys)):
             kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*0.4 for g in 
range(nworker)])
             out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
@@ -197,8 +274,8 @@ def test_compress_kvstore(kv_type, compression='2bit', 
threshold=0.5):
             # on cpu
             sum_dequantized_vals = np.zeros(s)
             for g in range(nworker):
-                compr, curr_residual[g], decompr = 
compute_expected_2bit_quantization(
-                                                    grads_cpy[g], 
curr_residual[g], threshold)
+                compr, curr_residual[g], decompr = 
compute_expected_quantization(
+                                                    grads_cpy[g], 
curr_residual[g], threshold, quantize_func)
                 sum_dequantized_vals += (decompr * rate)
 
             for g in range(nworker):
@@ -206,9 +283,14 @@ def test_compress_kvstore(kv_type, compression='2bit', 
threshold=0.5):
 
     pull_init_test(kv)
     pull_before_push(kv)
-    push_zeros(kv)
-    curval = verify_residual(kv, threshold, rate)
-    check_neg(kv, -1*threshold, rate, curval)
+    if compression == '1bit':
+        push_ones(kv, sign=1)
+        push_ones(kv, sign=-1)
+        verify_residual_1bit(kv, threshold, rate)
+    elif compression == '2bit':
+        push_zeros(kv)
+        curval = verify_residual_2bit(kv, threshold, rate)
+        check_neg(kv, -1*threshold, rate, curval)
     check_compr_random(kv, threshold)
 
 ## group keys interface
@@ -252,7 +334,10 @@ if __name__ == "__main__":
         test_kvstore('local_allreduce_device', stype)
 
     ## compression for local kvstore happens only when reduce is on device
-    test_compress_kvstore('local_allreduce_device')
+    test_compress_kvstore('local_allreduce_device', '1bit', -.5)
+    test_compress_kvstore('local_allreduce_device', '1bit', 0)
+    test_compress_kvstore('local_allreduce_device', '1bit', .5)
+    test_compress_kvstore('local_allreduce_device', '2bit', .5)
     for stype in stypes:
         test_group_kvstore('local_update_cpu', stype)
         test_group_kvstore('local_allreduce_cpu', stype)

Reply via email to