This is an automated email from the ASF dual-hosted git repository.
zhasheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push:
new 5aceafc 1bit gradient compression implementation (#17952)
5aceafc is described below
commit 5aceafc67d55a40d69abc446ca024c3196a55b5c
Author: Shuo Ouyang <[email protected]>
AuthorDate: Thu Mar 11 06:45:53 2021 +0800
1bit gradient compression implementation (#17952)
Co-authored-by: shuo-ouyang <[email protected]>
---
ci/docker/runtime_functions.sh | 6 +-
python/mxnet/kvstore/kvstore.py | 5 +-
src/kvstore/gradient_compression-inl.h | 139 ++++++++++++++++++++++++----
src/kvstore/gradient_compression.cc | 80 +++++++++++++----
src/kvstore/gradient_compression.cu | 10 +++
src/kvstore/gradient_compression.h | 8 +-
tests/nightly/dist_sync_kvstore.py | 119 ++++++++++++++++++++++--
tests/nightly/test_kvstore.py | 159 +++++++++++++++++++++++++--------
8 files changed, 439 insertions(+), 87 deletions(-)
diff --git a/ci/docker/runtime_functions.sh b/ci/docker/runtime_functions.sh
index fb9783d..1dc82a2 100755
--- a/ci/docker/runtime_functions.sh
+++ b/ci/docker/runtime_functions.sh
@@ -904,8 +904,10 @@ integrationtest_ubuntu_cpu_dist_kvstore() {
python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=gluon_type_cpu
python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py
python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --no-multiprecision
- python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=compressed_cpu
- python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=compressed_cpu --no-multiprecision
+ python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=compressed_cpu_1bit
+ python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=compressed_cpu_1bit --no-multiprecision
+ python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=compressed_cpu_2bit
+ python3 ../../tools/launch.py -n 7 --launcher local python3
dist_sync_kvstore.py --type=compressed_cpu_2bit --no-multiprecision
python3 ../../tools/launch.py -n 3 --launcher local python3
test_server_profiling.py
popd
}
diff --git a/python/mxnet/kvstore/kvstore.py b/python/mxnet/kvstore/kvstore.py
index ad83ad4..e37fab1 100644
--- a/python/mxnet/kvstore/kvstore.py
+++ b/python/mxnet/kvstore/kvstore.py
@@ -501,6 +501,9 @@ class KVStore(KVStoreBase):
""" Specifies type of low-bit quantization for gradient compression \
and additional arguments depending on the type of compression being
used.
+ The 1bit compression works as follows: values which is above the
threshold in the
+ gradient will be set to +1, whereas values below threshold will be set
to -1.
+
2bit Gradient Compression takes a positive float `threshold`.
The technique works by thresholding values such that positive values
in the
gradient above threshold will be set to threshold. Negative values
whose absolute
@@ -541,7 +544,7 @@ class KVStore(KVStoreBase):
A dictionary specifying the type and parameters for gradient
compression.
The key `type` in this dictionary is a
required string argument and specifies the type of gradient
compression.
- Currently `type` can be only `2bit`
+ Currently `type` can be only `1bit` and `2bit`
Other keys in this dictionary are optional and specific to the type
of gradient compression.
"""
diff --git a/src/kvstore/gradient_compression-inl.h
b/src/kvstore/gradient_compression-inl.h
index 9b69bd1..7b906fd 100644
--- a/src/kvstore/gradient_compression-inl.h
+++ b/src/kvstore/gradient_compression-inl.h
@@ -32,13 +32,105 @@ namespace mxnet {
namespace kvstore {
// these gpu functions are defined in gradient_compression.cu
+void Quantize1BitImpl(mshadow::Stream<mshadow::gpu> *s, const
std::vector<mxnet::TBlob> &inputs,
+ const float threshold);
+void Dequantize1BitImpl(mshadow::Stream<mshadow::gpu> *s, const
std::vector<mxnet::TBlob> &inputs,
+ const float threshold);
void Quantize2BitImpl(mshadow::Stream<mshadow::gpu> *s, const
std::vector<mxnet::TBlob> &inputs,
const float threshold);
void Dequantize2BitImpl(mshadow::Stream<mshadow::gpu> *s, const
std::vector<mxnet::TBlob> &inputs,
const float threshold);
+struct quantize_1bit {
+ MSHADOW_XINLINE static void Map(int out_byte_id,
+ int original_size,
+ float *out,
+ float *grad,
+ float *residual,
+ const float threshold) {
+ // this byte contains the compressed representation of
+ // upto 8 values starting from (char*)out + out_byte_id
+ char *compr_byte = reinterpret_cast<char *>(out) + out_byte_id;
+
+ // init to 0
+ *compr_byte = 0;
+ // start and end are indices in original grad array
+ const int start = out_byte_id << 3;
+ const int end = (start + 8 <= original_size) ? start + 8 : original_size;
+
+ // masks used to quantize data
+ const uint8_t bits[] = {0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01};
+ for (int i = start; i < end; ++i) {
+ // adds gradient to existing residual to get updated grad
+ residual[i] += grad[i];
+ if (residual[i] > threshold) {
+ // set data to 1
+ *compr_byte |= bits[(i & 7)];
+ // reduce residual by 1
+ residual[i] -= 1;
+ } else {
+ // do nothing on compr_byte because it is initialized to 0
+ // add residual by 1
+ // because current position will be dequantized to -1
+ residual[i] += 1;
+ }
+ }
+ }
+};
+
+template<typename xpu>
+void Quantize1BitKernelLaunch(mshadow::Stream<xpu> *s, const
std::vector<mxnet::TBlob> &inputs,
+ const float threshold) {
+ mxnet::op::mxnet_op::Kernel<quantize_1bit, xpu>
+ ::Launch(s,
+ inputs[2].Size() * 4, // compressed array byte size
+ inputs[0].Size(), // original size
+ inputs[2].dptr<float>(), // compressed array
+ inputs[0].dptr<float>(), // original array
+ inputs[1].dptr<float>(), // residual array
+ threshold); // threshold
+}
+
+struct dequantize_1bit {
+ MSHADOW_XINLINE static void Map(int i,
+ float *out,
+ float *in,
+ const float threshold) {
+ // get position of dequantized value to fill
+ float *outval = out + i;
+ // gets byte which holds quantized value for this position
+ char *ch_ptr = reinterpret_cast < char * > (in + (i >> 5));
+ ch_ptr += ((i & 31) >> 3);
+ // masks used to quantize data
+ const uint8_t bits[] = {0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01};
+ // col denotes which bit of a byte is set for this value
+ // col=0 implies the first bit, col=1 implies the second bit,...
+ const int col = i & 7;
+ const uint8_t mask = bits[col];
+ const uint8_t masked = *ch_ptr & mask;
+ if (masked == mask) {
+ *outval = +1;
+ } else {
+ // if current position of byte is 0
+ // dequantized it to -1
+ *outval = -1;
+ }
+ }
+};
+
+template<typename xpu>
+void Dequantize1BitKernelLaunch(mshadow::Stream<xpu> *s, const
std::vector<mxnet::TBlob> &inputs,
+ const float threshold) {
+ mxnet::op::mxnet_op::Kernel<dequantize_1bit, xpu>
+ ::Launch(s,
+ inputs[1].Size(), // original size
+ inputs[1].dptr<float>(), // out array
+ inputs[0].dptr<float>(), // compressed array
+ threshold); // threshold
+}
+
struct quantize_2bit {
- MSHADOW_XINLINE static void Map(int out_block_id,
+ MSHADOW_XINLINE static void Map(int out_byte_id,
int original_size,
float *out,
float *grad,
@@ -46,15 +138,14 @@ struct quantize_2bit {
const float neg_threshold,
const float pos_threshold) {
// this block contains the compressed representation of
- // upto 16 values starting from out_block_id*16
- float *compr_block = out + out_block_id;
+ // upto 4 values starting from (char*)out + out_byte_id
+ char *compr_byte = reinterpret_cast<char *>(out) + out_byte_id;
// init to 0
- *compr_block = 0;
+ *compr_byte = 0;
// start and end are indices in original grad array
- const int start = out_block_id << 4;
- const int end = (start + 16 <= original_size) ? start + 16 : original_size;
- // cast as char* to manipulate bits of float addresses
- char *block_ptr = reinterpret_cast < char * > (compr_block);
+ const int start = out_byte_id << 2;
+ const int end = (start + 4 <= original_size) ? start + 4 : original_size;
+
// masks to set bits when value meets pos_threshold
// 0xc0 is mask when value is to be represented by the first two bits in a
char*
// 0xc0 means first two bits are set to 11
@@ -62,18 +153,16 @@ struct quantize_2bit {
// masks to set bits when value meets neg_threshold
const uint8_t negbits[] = {0x80, 0x20, 0x08, 0x02};
for (int i = start; i < end; i++) {
- // adds offset to reach appropriate byte
- char *curr_byte = block_ptr + ((i - start) >> 2);
// adds gradient to existing residual to get updated grad
residual[i] += grad[i];
if (residual[i] >= pos_threshold) {
// set data to 11
- *curr_byte |= posbits[(i & 3)];
+ *compr_byte |= posbits[(i & 3)];
// reduce residual by pos_threshold
residual[i] -= pos_threshold;
} else if (residual[i] <= neg_threshold) {
// set data to 10
- *curr_byte |= negbits[(i & 3)];
+ *compr_byte |= negbits[(i & 3)];
residual[i] -= neg_threshold;
}
}
@@ -85,13 +174,13 @@ void Quantize2BitKernelLaunch(mshadow::Stream<xpu> *s,
const std::vector<mxnet::
const float threshold) {
mxnet::op::mxnet_op::Kernel<quantize_2bit, xpu>
::Launch(s,
- inputs[2].Size(), // compressed array size
- inputs[0].Size(), // original size
- inputs[2].dptr<float>(), // compressed array
- inputs[0].dptr<float>(), // original array
- inputs[1].dptr<float>(), // residual array
- -1 *threshold, // negative threshold
- threshold); // positive threshold
+ inputs[2].Size() * 4, // compressed array byte size
+ inputs[0].Size(), // original size
+ inputs[2].dptr<float>(), // compressed array
+ inputs[0].dptr<float>(), // original array
+ inputs[1].dptr<float>(), // residual array
+ -1 *threshold, // negative threshold
+ threshold); // positive threshold
}
struct dequantize_2bit {
@@ -138,6 +227,18 @@ void Dequantize2BitKernelLaunch(mshadow::Stream<xpu> *s,
const std::vector<mxnet
threshold); // positive threshold
}
+inline void Quantize1BitImpl(mshadow::Stream<mshadow::cpu> *s,
+ const std::vector<mxnet::TBlob> &inputs,
+ const float threshold) {
+ Quantize1BitKernelLaunch(s, inputs, threshold);
+}
+
+inline void Dequantize1BitImpl(mshadow::Stream<mshadow::cpu> *s,
+ const std::vector<mxnet::TBlob> &inputs,
+ const float threshold) {
+ Dequantize1BitKernelLaunch(s, inputs, threshold);
+}
+
inline void Quantize2BitImpl(mshadow::Stream<mshadow::cpu> *s,
const std::vector<mxnet::TBlob> &inputs,
const float threshold) {
diff --git a/src/kvstore/gradient_compression.cc
b/src/kvstore/gradient_compression.cc
index 30aaec9..dbfc495 100644
--- a/src/kvstore/gradient_compression.cc
+++ b/src/kvstore/gradient_compression.cc
@@ -41,8 +41,10 @@ void GradientCompression::SetParams(const
std::vector<std::pair<std::string, std
& kwargs) {
GradientCompressionParam params;
params.InitAllowUnknown(kwargs);
- CHECK_GT(params.threshold, 0) << "threshold must be greater than 0";
- if (params.type == "2bit") {
+ if (params.type == "1bit") {
+ SetOneBitCompression(params.threshold);
+ } else if (params.type == "2bit") {
+ CHECK_GT(params.threshold, 0) << "threshold must be greater than 0 for two
bit compression";
SetTwoBitCompression(params.threshold);
} else {
LOG(FATAL) << "Unknown type for gradient compression " << params.type;
@@ -57,6 +59,11 @@ std::string GradientCompression::get_type_str() {
return std::to_string(static_cast<int>(type_));
}
+void GradientCompression::SetOneBitCompression(const float threshold) {
+ type_ = CompressionType::kOneBit;
+ threshold_ = threshold;
+}
+
void GradientCompression::SetTwoBitCompression(const float threshold) {
type_ = CompressionType::kTwoBit;
threshold_ = threshold;
@@ -83,7 +90,9 @@ void GradientCompression::DecodeParams(const std::string &s) {
}
int GradientCompression::GetCompressionFactor() {
- if (type_ == CompressionType::kTwoBit) {
+ if (type_ == CompressionType::kOneBit) {
+ return 32;
+ } else if (type_ == CompressionType::kTwoBit) {
return 16;
} else {
LOG(FATAL) << "Unsupported compression type: " << get_type_str();
@@ -106,16 +115,34 @@ void GradientCompression::Quantize(const mxnet::NDArray
&from, mxnet::NDArray *t
const int a = from.ctx().dev_mask();
const int b = to->ctx().dev_mask();
const float threshold = threshold_;
- if (type_ == CompressionType::kTwoBit) {
- if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+ if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+ if (type_ == CompressionType::kOneBit) {
+ mxnet::Engine::Get()->PushSync([from, to, residual,
threshold](mxnet::RunContext ctx) {
+ std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(),
to->data()};
+ Quantize1BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
+ }, from.ctx(), {from.var()}, {to->var(), residual->var()},
+ mxnet::FnProperty::kNormal, priority, "QuantizeCPU");
+ } else if (type_ == CompressionType::kTwoBit) {
mxnet::Engine::Get()->PushSync([from, to, residual,
threshold](mxnet::RunContext ctx) {
std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(),
to->data()};
Quantize2BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
}, from.ctx(), {from.var()}, {to->var(), residual->var()},
mxnet::FnProperty::kNormal, priority, "QuantizeCPU");
} else {
+ LOG(FATAL) << "Unsupported quantization of type " << get_type_str();
+ }
+ } else {
+ if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
#if MXNET_USE_CUDA
- if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
+ if (type_ == CompressionType::kOneBit) {
+ mxnet::Engine::Get()->PushSync([from, to, residual,
threshold](mxnet::RunContext ctx) {
+ std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(),
to->data()};
+ Quantize1BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, threshold);
+ // Wait GPU kernel to complete
+ ctx.get_stream<mshadow::gpu>()->Wait();
+ }, from.ctx(), {from.var()}, {to->var(), residual->var()},
+ mxnet::FnProperty::kNormal, priority, "QuantizeGPU");
+ } else if (type_ == CompressionType::kTwoBit) {
mxnet::Engine::Get()->PushSync([from, to, residual,
threshold](mxnet::RunContext ctx) {
std::vector<mxnet::TBlob> inputs = {from.data(), residual->data(),
to->data()};
Quantize2BitImpl(ctx.get_stream<mshadow::gpu>(), inputs, threshold);
@@ -124,14 +151,14 @@ void GradientCompression::Quantize(const mxnet::NDArray
&from, mxnet::NDArray *t
}, from.ctx(), {from.var()}, {to->var(), residual->var()},
mxnet::FnProperty::kNormal, priority, "QuantizeGPU");
} else {
- LOG(FATAL) << "unknown device mask";
+ LOG(FATAL) << "Unsupported quantization of type " << get_type_str();
}
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
+ } else {
+ LOG(FATAL) << "Unknown device mask, from device mask " << a << " to
device mask " << b;
}
- } else {
- LOG(FATAL) << "Unsupported quantization of type " << get_type_str();
}
}
@@ -142,35 +169,52 @@ void GradientCompression::Dequantize(const mxnet::NDArray
&from, mxnet::NDArray
const int a = from.ctx().dev_mask();
const int b = to->ctx().dev_mask();
const float threshold = threshold_;
- if (type_ == CompressionType::kTwoBit) {
- if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+ if (a == mshadow::cpu::kDevMask && b == mshadow::cpu::kDevMask) {
+ if (type_ == CompressionType::kOneBit) {
+ mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext
ctx) {
+ std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
+ Dequantize1BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
+ }, from.ctx(), {from.var()}, {to->var()},
+ mxnet::FnProperty::kNormal, priority, "DequantizeCPU");
+ } else if (type_ == CompressionType::kTwoBit) {
mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext
ctx) {
std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
Dequantize2BitImpl(ctx.get_stream<mshadow::cpu>(), inputs, threshold);
}, from.ctx(), {from.var()}, {to->var()},
mxnet::FnProperty::kNormal, priority, "DequantizeCPU");
} else {
+ LOG(FATAL) << "Unsupported dequantization of type " << get_type_str();
+ }
+ } else {
+ if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
#if MXNET_USE_CUDA
- if (a == mshadow::gpu::kDevMask && b == mshadow::gpu::kDevMask) {
+ if (type_ == CompressionType::kOneBit) {
mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext
ctx) {
std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
- Dequantize2BitImpl(ctx.get_stream<mshadow::gpu>(), inputs,
threshold);
+ Dequantize1BitImpl(ctx.get_stream<mshadow::gpu>(), inputs,
threshold);
// Wait GPU kernel to complete
ctx.get_stream<mshadow::gpu>()->Wait();
}, from.ctx(), {from.var()}, {to->var()},
mxnet::FnProperty::kNormal, priority, "DequantizeGPU");
+ } else if (type_ == CompressionType::kTwoBit) {
+ mxnet::Engine::Get()->PushSync([from, to, threshold](mxnet::RunContext
ctx) {
+ std::vector<mxnet::TBlob> inputs = {from.data(), to->data()};
+ Dequantize2BitImpl(ctx.get_stream<mshadow::gpu>(), inputs,
threshold);
+ // Wait GPU kernel to completes
+ ctx.get_stream<mshadow::gpu>()->Wait();
+ }, from.ctx(), {from.var()}, {to->var()},
+ mxnet::FnProperty::kNormal, priority, "DequantizeGPU");
} else {
- LOG(FATAL) << "unknown device mask";
+ LOG(FATAL) << "Unsupported dequantization of type " << get_type_str();
}
#else
- LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
+ LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
+ } else {
+ LOG(FATAL) << "Unknown device mask, from device mask " << a << " to
device mask " << b;
}
- } else {
- LOG(FATAL) << "Unsupported dequantization of type " << get_type_str();
}
}
-
} // namespace kvstore
} // namespace mxnet
diff --git a/src/kvstore/gradient_compression.cu
b/src/kvstore/gradient_compression.cu
index b0d9662..c5bacc2 100644
--- a/src/kvstore/gradient_compression.cu
+++ b/src/kvstore/gradient_compression.cu
@@ -27,6 +27,16 @@
namespace mxnet {
namespace kvstore {
+void Quantize1BitImpl(mshadow::Stream<gpu>* s, const std::vector<TBlob>&
inputs,
+ const float threshold) {
+ Quantize1BitKernelLaunch(s, inputs, threshold);
+}
+
+void Dequantize1BitImpl(mshadow::Stream<gpu>* s, const std::vector<TBlob>&
inputs,
+ const float threshold) {
+ Dequantize1BitKernelLaunch(s, inputs, threshold);
+}
+
void Quantize2BitImpl(mshadow::Stream<gpu>* s, const std::vector<TBlob>&
inputs,
const float threshold) {
Quantize2BitKernelLaunch(s, inputs, threshold);
diff --git a/src/kvstore/gradient_compression.h
b/src/kvstore/gradient_compression.h
index f40b45f..5496ada 100644
--- a/src/kvstore/gradient_compression.h
+++ b/src/kvstore/gradient_compression.h
@@ -35,7 +35,7 @@ namespace mxnet {
namespace kvstore {
enum class CompressionType {
- kNone, kTwoBit
+ kNone, kOneBit, kTwoBit
};
struct GradientCompressionParam : public
dmlc::Parameter<GradientCompressionParam> {
@@ -73,6 +73,12 @@ class GradientCompression {
std::string get_type_str();
/*!
+ * \biref sets one bit gradient compression
+ * \param threshold float value used for thresholding gradients
+ */
+ void SetOneBitCompression(const float threshold);
+
+ /*!
* \brief sets two bit gradient compression
* \param threshold float value used for thresholding gradients
*/
diff --git a/tests/nightly/dist_sync_kvstore.py
b/tests/nightly/dist_sync_kvstore.py
index 3f5137b..1a89f5c 100644
--- a/tests/nightly/dist_sync_kvstore.py
+++ b/tests/nightly/dist_sync_kvstore.py
@@ -25,13 +25,13 @@ import mxnet as mx
import numpy as np
import numpy.random as rnd
from mxnet.test_utils import assert_almost_equal, assert_exception
-from test_kvstore import compute_expected_2bit_quantization
+from test_kvstore import compute_expected_quantization, compute_1bit,
compute_2bit
def check_diff(A, x, rank=None):
""" assert A == x
x can be scalar as well as numpy array
"""
- assert (np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(),
x.asnumpy())
+ assert (np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)
# setup
shape = (2, 3)
@@ -88,9 +88,8 @@ def set_optimizer(use_multiprecision):
kv.set_optimizer(mx.optimizer.create('test', rescale_grad=rate,
multi_precision=use_multiprecision))
return kv
-def init_kv_compressed(kv):
- threshold = 0.5
- kv.set_gradient_compression({'type': '2bit', 'threshold': threshold})
+def init_kv_compressed(kv, compression='2bit', threshold=.5):
+ kv.set_gradient_compression({'type': compression, 'threshold': threshold})
# init kv compression keys
for k, s in compr_keys_shapes:
kv.init(k, mx.nd.zeros(s))
@@ -230,6 +229,104 @@ def test_sync_push_pull(nrepeat):
check_big_row_sparse_keys(dtype, nrepeat)
print('worker ' + str(my_rank) + ' is done with non compression tests')
+def test_sync_1bit_compression(threshold, nrepeat):
+
+ def check_compr_pull_before_push():
+ for k, s in compr_keys_shapes:
+ val = mx.nd.ones(s)
+ kv.pull(k, val)
+ check_diff(val, 0)
+ for k, s in compr_init_keys_shapes:
+ # tests that GC is not used for init of a key
+ val = mx.nd.zeros(s)
+ kv.pull(k, val)
+ check_diff(val, 1)
+
+ def check_compr_ones():
+ for k, s in compr_keys_shapes:
+ val = mx.nd.zeros(s)
+ kv.pull(k, val)
+ curr_val = val[0][0].asnumpy()[0]
+ kv.push(k, mx.nd.ones(s))
+ out = mx.nd.zeros(s)
+ kv.pull(k, out=out)
+ newval = curr_val + rate * nworker
+ check_diff(out, newval)
+
+ def check_compr_neg_ones():
+ for k, s in compr_keys_shapes:
+ val = mx.nd.zeros(s)
+ kv.pull(k, val)
+ curr_val = val[0][0].asnumpy()[0]
+ kv.push(k, -1 * mx.nd.ones(s))
+ out = mx.nd.ones(s)
+ kv.pull(k, out=out)
+ # current value should be zero after call
+ # check_compr_ones and check_compr_neg_ones
+ check_diff(out, 0)
+
+ def check_compr_residual(threshold):
+ curr_residual = 0
+ curr_val = rate * nworker if 2 + curr_residual > threshold else -rate
* nworker
+ for k, s in compr_keys_shapes:
+ kv.push(k, 2 * mx.nd.ones(s))
+ out = mx.nd.zeros(s)
+ kv.pull(k, out)
+ check_diff(out, curr_val)
+
+ curr_residual = 1 if 2 > threshold else 3
+ curr_val += rate * nworker if 0 + curr_residual > threshold else -rate
* nworker
+ for k, s in compr_keys_shapes:
+ kv.push(k, mx.nd.zeros(s))
+ out = mx.nd.zeros(s)
+ kv.pull(k, out)
+ check_diff(out, curr_val)
+
+ curr_residual += -1 if curr_residual > threshold else +1
+ curr_val += rate * nworker if -2 + curr_residual > threshold else
-rate * nworker
+ for k, s in compr_keys_shapes:
+ kv.push(k, -2 * mx.nd.ones(s))
+ out = mx.nd.zeros(s)
+ kv.pull(k, out)
+ check_diff(out, curr_val)
+
+ def check_compr_random(threshold, nrepeat):
+ # set a seed so all workers generate same data. knowing this helps
+ # calculate expected value after pull
+ mx.random.seed(123)
+ rnd.seed(123)
+
+ # use new keys so residual is 0 for calculation of expected
+ for k,s in compr_random_keys_shapes:
+ kv.init(k, mx.nd.zeros(s))
+ for k,s in compr_random_keys_shapes:
+ curr_residual = np.zeros(s)
+ for l in range(nrepeat):
+ orig_val = mx.nd.zeros(s)
+ kv.pull(k, orig_val)
+
+ grad = mx.nd.array(rnd.rand(s[0], s[1]))
+ # creates a copy because push changes grad because of
assignment
+ grad_cpy = mx.nd.array(grad)
+ kv.push(k, grad)
+ val = mx.nd.zeros(s)
+ kv.pull(k, val)
+
+ diff = val - orig_val
+
+ # compute expected by using simulation of operator
+ compr, curr_residual, decompr =
compute_expected_quantization(grad_cpy, curr_residual, threshold, compute_1bit)
+ decompr *= nworker * rate
+ assert_almost_equal(diff.asnumpy(), decompr)
+
+ print ('worker ' + str(my_rank) + ' started with 1bit compression tests')
+ check_compr_pull_before_push()
+ check_compr_ones()
+ check_compr_neg_ones()
+ check_compr_residual(threshold)
+ check_compr_random(threshold, nrepeat)
+ print('worker ' + str(my_rank) + ' is done with 1bit compression tests')
+
def test_sync_2bit_compression(threshold, nrepeat):
def check_compr_residual(threshold):
for k, s in compr_keys_shapes:
@@ -316,17 +413,17 @@ def test_sync_2bit_compression(threshold, nrepeat):
diff = val - orig_val
# compute expected by using simulation of operator
- compr, curr_residual, decompr =
compute_expected_2bit_quantization(grad_cpy, curr_residual, threshold)
+ compr, curr_residual, decompr =
compute_expected_quantization(grad_cpy, curr_residual, threshold, compute_2bit)
decompr *= nworker * rate
assert_almost_equal(diff.asnumpy(), decompr)
- print ('worker ' + str(my_rank) + ' started with compression tests')
+ print ('worker ' + str(my_rank) + ' started with 2bit compression tests')
check_compr_pull_before_push()
check_compr_zero()
check_compr_residual(threshold)
check_compr_ones(threshold)
check_compr_random(threshold, nrepeat)
- print('worker ' + str(my_rank) + ' is done with compression tests')
+ print('worker ' + str(my_rank) + ' is done with 2bit compression tests')
def test_sync_init(gpu_tests=False):
def get_dtype(idx, cur_keys):
@@ -455,7 +552,11 @@ if __name__ == "__main__":
kv = init_kv()
kv = set_optimizer(use_multiprecision=opt.multiprecision)
test_sync_push_pull(opt.nrepeat)
- elif opt.type == 'compressed_cpu':
+ elif opt.type == 'compressed_cpu_1bit':
+ kv, threshold = init_kv_compressed(kv, '1bit', 0)
+ kv = set_optimizer(use_multiprecision=opt.multiprecision)
+ test_sync_1bit_compression(threshold, opt.nrepeat)
+ elif opt.type == 'compressed_cpu_2bit':
kv, threshold = init_kv_compressed(kv)
kv = set_optimizer(use_multiprecision=opt.multiprecision)
test_sync_2bit_compression(threshold, opt.nrepeat)
diff --git a/tests/nightly/test_kvstore.py b/tests/nightly/test_kvstore.py
index ced3ee1..65f6c4a 100755
--- a/tests/nightly/test_kvstore.py
+++ b/tests/nightly/test_kvstore.py
@@ -27,53 +27,73 @@ import copy
from mxnet.test_utils import assert_almost_equal
+
def check_diff_to_scalar(A, x, rank=None):
""" assert A == x"""
assert(np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)
-def compute_expected_2bit_quantization(arr, curr_residual, threshold):
- from struct import pack,unpack
- def bits2int(bits):
- bits = [int(x) for x in bits[::-1]]
- x = 0
- for i in range(len(bits)):
- x += bits[i]*2**i
- return x
+def compute_1bit(arr, curr_residual, threshold):
+ str_quant = ""
+ new_residual = []
+ decompr = []
- def as_float32(s):
- return unpack("f",pack("I", bits2int(s)))[0]
+ for idx, val in np.ndenumerate(arr):
+ val += curr_residual[idx]
+ if val > threshold:
+ str_quant += "1"
+ new_residual.append(val - 1)
+ decompr.append(1)
+ else:
+ str_quant += "0"
+ new_residual.append(val + 1)
+ decompr.append(-1)
- # str_quant stores the quantized representation as a sequence of bits
- str_quant = ''
+ # append extra bits when size of array not a factor of 32
+ if len(str_quant) != 32:
+ str_quant += "0" * (32 - len(str_quant) % 32)
+ return str_quant, new_residual, decompr
+
+def compute_2bit(arr, curr_residual, threshold):
+ str_quant = ""
new_residual = []
decompr = []
- arr_npy = arr.asnumpy()
- for i, a in np.ndenumerate(arr_npy):
- a += curr_residual[i]
- if a >= threshold:
- str_quant += '11'
- new_residual.append(a - threshold)
+ for idx, val in np.ndenumerate(arr):
+ val += curr_residual[idx]
+ if val >= threshold:
+ str_quant += "11"
+ new_residual.append(val - threshold)
decompr.append(threshold)
- elif a <= (-1*threshold):
- str_quant += '10'
- new_residual.append(a + threshold)
- decompr.append(-1*threshold)
+ elif val <= -threshold:
+ str_quant += "10"
+ new_residual.append(val + threshold)
+ decompr.append(-threshold)
else:
- str_quant += '00'
- new_residual.append(a)
+ str_quant += "00"
+ new_residual.append(val)
decompr.append(0)
+
# append extra bits when size of array not a factor of 16
- if len(str_quant)%16 != 0:
- str_quant += '0'*(16 - len(str_quant)%16)
+ if len(str_quant) % 16 != 0:
+ str_quant += "0" * (16 - len(str_quant) % 16)
+ return str_quant, new_residual, decompr
+def compute_expected_quantization(arr, curr_residual, threshold,
quantize_func):
+
+ from struct import pack,unpack
+ def as_float32(s):
+ return unpack("f",pack("I", int(s, 2)))[0]
+
+ arr_npy = arr.asnumpy()
+ # str_quant stores the quantized representation as a sequence of bits
+ str_quant, new_residual, decompr = quantize_func(arr_npy, curr_residual,
threshold)
+
compr = []
# converts the string generated into integers 32chars at a time
- i = 0
- while i<len(str_quant):
+ for i in range(0, len(str_quant), 32):
cur_float = str_quant[i+24:i+32] + str_quant[i+16:i+24] +
str_quant[i+8:i+16] + str_quant[i:i+8]
compr.append(as_float32(cur_float))
- i+=32
+
return np.array(compr), np.array(new_residual).reshape(arr.shape),
np.array(decompr).reshape(arr.shape)
## individual key interface
@@ -99,8 +119,15 @@ def test_kvstore(kv_type, stype):
assert(err < 1e-6), (err, shapes[j])
def test_compress_kvstore(kv_type, compression='2bit', threshold=0.5):
- print(kv_type + ' with ' + compression + ' compression')
+ print(kv_type + ' with ' + compression + ' compression and threshold is '
+ str(threshold))
rate = 2
+ quantize_func = None
+ if compression == '1bit':
+ quantize_func = compute_1bit
+ elif compression == '2bit':
+ quantize_func = compute_2bit
+ else:
+ raise RuntimeError("Unknown gradient compression type!")
kv = mx.kv.create(kv_type)
kv.set_gradient_compression({'type':compression, 'threshold':threshold})
kv.set_optimizer(mx.optimizer.create('test', rescale_grad=rate))
@@ -131,6 +158,56 @@ def test_compress_kvstore(kv_type, compression='2bit',
threshold=0.5):
for o in out:
assert_almost_equal(o.asnumpy(), exp)
+ def push_ones(kv, sign=1):
+ for i in range(nrepeat):
+ for j in range(len(keys)):
+ kv.push(keys[j], [sign * mx.nd.ones(shapes[j], mx.gpu(g)) for
g in range(nworker)])
+ out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in
range(nworker)]
+ kv.pull(keys[j], out=out)
+ if sign == 1:
+ exp = (i + 1) * rate * nworker *
np.ones_like(out[0].asnumpy())
+ else:
+ exp = (nrepeat - i - 1) * rate * nworker *
np.ones_like(out[0].asnumpy())
+ for o in out:
+ assert_almost_equal(o.asnumpy(), exp)
+
+ def verify_residual_1bit(kv, threshold, rate):
+ # current values must equal to zero
+ for j in range(len(keys)):
+ out = [mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)]
+ kv.pull(keys[j], out=out)
+ exp = np.zeros_like(out[0].asnumpy())
+ for o in out:
+ assert_almost_equal(o.asnumpy(), exp)
+
+ curr_residual = 0
+ curr_val = rate * nworker if 2 > threshold else -rate * nworker
+ for j in range(len(keys)):
+ kv.push(keys[j], [2 * mx.nd.ones(shapes[j], mx.gpu(g)) for g in
range(nworker)])
+ out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
+ kv.pull(keys[j], out=out)
+
+ for o in out:
+ check_diff_to_scalar(o, curr_val)
+
+ curr_residual = 1 if 2 > threshold else 3
+ curr_val += rate * nworker if 0 + curr_residual > threshold else -rate
* nworker
+ for j in range(len(keys)):
+ kv.push(keys[j], [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in
range(nworker)])
+ out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
+ kv.pull(keys[j], out=out)
+ for o in out:
+ check_diff_to_scalar(o, curr_val)
+
+ curr_residual += -1 if curr_residual > threshold else +1
+ curr_val += rate * nworker if -2 + curr_residual > threshold else
-rate * nworker
+ for j in range(len(keys)):
+ kv.push(keys[j], [-2 * mx.nd.ones(shapes[j], mx.gpu(g)) for g in
range(nworker)])
+ out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
+ kv.pull(keys[j], out=out)
+ for o in out:
+ check_diff_to_scalar(o, curr_val)
+
def push_zeros(kv):
for i in range(nrepeat):
for j in range(len(keys)):
@@ -141,7 +218,7 @@ def test_compress_kvstore(kv_type, compression='2bit',
threshold=0.5):
for o in out:
assert_almost_equal(o.asnumpy(), exp)
- def verify_residual(kv, threshold, rate):
+ def verify_residual_2bit(kv, threshold, rate):
for j in range(len(keys)):
kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*0.4 for g in
range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
@@ -197,8 +274,8 @@ def test_compress_kvstore(kv_type, compression='2bit',
threshold=0.5):
# on cpu
sum_dequantized_vals = np.zeros(s)
for g in range(nworker):
- compr, curr_residual[g], decompr =
compute_expected_2bit_quantization(
- grads_cpy[g],
curr_residual[g], threshold)
+ compr, curr_residual[g], decompr =
compute_expected_quantization(
+ grads_cpy[g],
curr_residual[g], threshold, quantize_func)
sum_dequantized_vals += (decompr * rate)
for g in range(nworker):
@@ -206,9 +283,14 @@ def test_compress_kvstore(kv_type, compression='2bit',
threshold=0.5):
pull_init_test(kv)
pull_before_push(kv)
- push_zeros(kv)
- curval = verify_residual(kv, threshold, rate)
- check_neg(kv, -1*threshold, rate, curval)
+ if compression == '1bit':
+ push_ones(kv, sign=1)
+ push_ones(kv, sign=-1)
+ verify_residual_1bit(kv, threshold, rate)
+ elif compression == '2bit':
+ push_zeros(kv)
+ curval = verify_residual_2bit(kv, threshold, rate)
+ check_neg(kv, -1*threshold, rate, curval)
check_compr_random(kv, threshold)
## group keys interface
@@ -252,7 +334,10 @@ if __name__ == "__main__":
test_kvstore('local_allreduce_device', stype)
## compression for local kvstore happens only when reduce is on device
- test_compress_kvstore('local_allreduce_device')
+ test_compress_kvstore('local_allreduce_device', '1bit', -.5)
+ test_compress_kvstore('local_allreduce_device', '1bit', 0)
+ test_compress_kvstore('local_allreduce_device', '1bit', .5)
+ test_compress_kvstore('local_allreduce_device', '2bit', .5)
for stype in stypes:
test_group_kvstore('local_update_cpu', stype)
test_group_kvstore('local_allreduce_cpu', stype)