Repository: incubator-hawq Updated Branches: refs/heads/master 54a9af323 -> 2662bebd1
HAWQ-1506. Fix multi-append bug of write a encryption zone Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2662bebd Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2662bebd Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2662bebd Branch: refs/heads/master Commit: 2662bebd163069f5742ff0b236768cd559089f28 Parents: 54a9af3 Author: interma <[email protected]> Authored: Tue Jul 25 16:50:25 2017 +0800 Committer: rlei <[email protected]> Committed: Fri Jul 28 14:25:38 2017 +0800 ---------------------------------------------------------------------- depends/libhdfs3/mock/MockCryptoCodec.h | 5 +- depends/libhdfs3/src/client/CryptoCodec.cpp | 319 ++++++++++--------- depends/libhdfs3/src/client/CryptoCodec.h | 96 +++--- .../libhdfs3/src/client/FileEncryptionInfo.h | 2 +- depends/libhdfs3/src/client/HttpClient.cpp | 2 + .../libhdfs3/src/client/OutputStreamImpl.cpp | 32 +- .../libhdfs3/test/function/TestCInterface.cpp | 160 +++++++++- .../libhdfs3/test/function/TestKmsClient.cpp | 1 - .../libhdfs3/test/unit/UnitTestCryptoCodec.cpp | 16 +- .../libhdfs3/test/unit/UnitTestOutputStream.cpp | 2 +- 10 files changed, 416 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/mock/MockCryptoCodec.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/mock/MockCryptoCodec.h b/depends/libhdfs3/mock/MockCryptoCodec.h index 4d23e11..a9a220e 100644 --- a/depends/libhdfs3/mock/MockCryptoCodec.h +++ b/depends/libhdfs3/mock/MockCryptoCodec.h @@ -30,8 +30,9 @@ class MockCryptoCodec: public Hdfs::CryptoCodec { public: MockCryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize) : CryptoCodec(encryptionInfo, kcp, bufSize) {} - MOCK_METHOD2(encode, std::string(const char * buffer,int64_t size)); - MOCK_METHOD2(decode, std::string(const char * buffer,int64_t size)); + + MOCK_METHOD2(init, int(CryptoMethod crypto_method, int64_t stream_offset)); + MOCK_METHOD2(cipher_wrap, std::string(const char * buffer,int64_t size)); }; #endif /* _HDFS_LIBHDFS3_MOCK_CRYPTOCODEC_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/CryptoCodec.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/CryptoCodec.cpp b/depends/libhdfs3/src/client/CryptoCodec.cpp index 6ba1b74..0ca2d16 100644 --- a/depends/libhdfs3/src/client/CryptoCodec.cpp +++ b/depends/libhdfs3/src/client/CryptoCodec.cpp @@ -25,154 +25,181 @@ using namespace Hdfs::Internal; -namespace Hdfs { - -/** - * Construct a CryptoCodec instance. - * @param encryptionInfo the encryption info of file. - * @param kcp a KmsClientProvider instance to get key from kms server. - * @param bufSize crypto buffer size. - */ -CryptoCodec::CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize) : encryptionInfo(encryptionInfo), kcp(kcp), bufSize(bufSize) -{ - - /* Init global status. */ - ERR_load_crypto_strings(); - OpenSSL_add_all_algorithms(); - OPENSSL_config(NULL); - - /* Create cipher context. */ - encryptCtx = EVP_CIPHER_CTX_new(); - cipher = NULL; - -} - -/** - * Destroy a CryptoCodec instance. - */ -CryptoCodec::~CryptoCodec() -{ - if (encryptCtx) - EVP_CIPHER_CTX_free(encryptCtx); -} - -/** - * Get decrypted key from kms. - */ -std::string CryptoCodec::getDecryptedKeyFromKms() -{ - ptree map = kcp->decryptEncryptedKey(*encryptionInfo); - std::string key; - try { - key = map.get < std::string > ("material"); - } catch (...) { - THROW(HdfsIOException, "CryptoCodec : Can not get key from kms."); - } - - int rem = key.length() % 4; - if (rem) { - rem = 4 - rem; - while (rem != 0) { - key = key + "="; - rem--; - } - } - - std::replace(key.begin(), key.end(), '-', '+'); - std::replace(key.begin(), key.end(), '_', '/'); - - LOG(INFO, "CryptoCodec : getDecryptedKeyFromKms material is :%s", key.c_str()); - - key = KmsClientProvider::base64Decode(key); - return key; - - -} - -/** - * Common encode/decode buffer method. - * @param buffer the buffer to be encode/decode. - * @param size the size of buffer. - * @param enc true is for encode, false is for decode. - * @return return the encode/decode buffer. - */ -std::string CryptoCodec::endecInternal(const char * buffer, int64_t size, bool enc) -{ - std::string key = encryptionInfo->getKey(); - std::string iv = encryptionInfo->getIv(); - LOG(INFO, - "CryptoCodec : endecInternal info. key:%s, iv:%s, buffer:%s, size:%d, is_encode:%d.", - key.c_str(), iv.c_str(), buffer, size, enc); - - /* Get decrypted key from KMS */ - key = getDecryptedKeyFromKms(); - - /* Select cipher method based on the key length. */ - if (key.length() == KEY_LENGTH_256) { - cipher = EVP_aes_256_ctr(); - } else if (key.length() == KEY_LENGTH_128) { - cipher = EVP_aes_128_ctr(); - } else { - THROW(InvalidParameter, "CryptoCodec : Invalid key length."); - } - - /* Init cipher context with cipher method, encrypted key and IV from KMS. */ - int encode = enc ? 1 : 0; - if (!EVP_CipherInit_ex(encryptCtx, cipher, NULL, - (const unsigned char *) key.c_str(), - (const unsigned char *) iv.c_str(), encode)) { - LOG(WARNING, "EVP_CipherInit_ex failed"); - } - LOG(DEBUG3, "EVP_CipherInit_ex successfully"); - EVP_CIPHER_CTX_set_padding(encryptCtx, 0); - - /* Encode/decode buffer within cipher context. */ - std::string result; - result.resize(size); - int offset = 0; - int remaining = size; - int len = 0; - /* If the encode/decode buffer size larger than crypto buffer size, encode/decode buffer one by one. */ - while (remaining > bufSize) { - if (!EVP_CipherUpdate(encryptCtx, (unsigned char *) &result[offset], - &len, (const unsigned char *) buffer + offset, bufSize)) { - std::string err = ERR_lib_error_string(ERR_get_error()); - THROW(HdfsIOException, "CryptoCodec : Cannot encrypt AES data %s", - err.c_str()); - } - offset += len; - remaining -= len; - LOG(DEBUG3, - "CryptoCodec : EVP_CipherUpdate successfully, result:%s, len:%d", - result.c_str(), len); - } - if (remaining) { - if (!EVP_CipherUpdate(encryptCtx, (unsigned char *) &result[offset], - &len, (const unsigned char *) buffer + offset, remaining)) { - std::string err = ERR_lib_error_string(ERR_get_error()); - THROW(HdfsIOException, "CryptoCodec : Cannot encrypt AES data %s", - err.c_str()); - } - } - - return result; -} -/** - * Encode buffer. - */ -std::string CryptoCodec::encode(const char * buffer, int64_t size) -{ - return endecInternal(buffer, size, true); -} +namespace Hdfs { -/** - * Decode buffer. - */ -std::string CryptoCodec::decode(const char * buffer, int64_t size) -{ - return endecInternal(buffer, size, false); -} + //copy from java HDFS code + std::string CryptoCodec::calculateIV(const std::string& initIV, unsigned long counter) { + char IV[initIV.length()]; + + int i = initIV.length(); // IV length + int j = 0; // counter bytes index + unsigned int sum = 0; + while (i-- > 0) { + // (sum >>> Byte.SIZE) is the carry for addition + sum = (initIV[i] & 0xff) + (sum >> 8); + if (j++ < 8) { // Big-endian, and long is 8 bytes length + sum += (char) counter & 0xff; + counter >>= 8; + } + IV[i] = (char) sum; + } + + return std::string(IV, initIV.length()); + } + + CryptoCodec::CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize) : + encryptionInfo(encryptionInfo), kcp(kcp), bufSize(bufSize) + { + + // Init global status + ERR_load_crypto_strings(); + OpenSSL_add_all_algorithms(); + OPENSSL_config(NULL); + + // Create cipher context + cipherCtx = EVP_CIPHER_CTX_new(); + cipher = NULL; + + padding = 0; + counter = 0; + is_init = false; + } + + CryptoCodec::~CryptoCodec() + { + if (cipherCtx) + EVP_CIPHER_CTX_free(cipherCtx); + } + + std::string CryptoCodec::getDecryptedKeyFromKms() + { + ptree map = kcp->decryptEncryptedKey(*encryptionInfo); + std::string key; + try { + key = map.get < std::string > ("material"); + } catch (...) { + THROW(HdfsIOException, "CryptoCodec : Can not get key from kms."); + } + + int rem = key.length() % 4; + if (rem) { + rem = 4 - rem; + while (rem != 0) { + key = key + "="; + rem--; + } + } + + std::replace(key.begin(), key.end(), '-', '+'); + std::replace(key.begin(), key.end(), '_', '/'); + + LOG(INFO, "CryptoCodec : getDecryptedKeyFromKms material is :%s", key.c_str()); + + key = KmsClientProvider::base64Decode(key); + return key; + } + + int CryptoCodec::init(CryptoMethod crypto_method, int64_t stream_offset) { + //check already init + if (is_init) + return 0; + + // Get decrypted key from KMS + std::string key = getDecryptedKeyFromKms(); + + // Select cipher method based on the key length + uint64_t AlgorithmBlockSize = key.length(); + if (AlgorithmBlockSize == KEY_LENGTH_256) { + cipher = EVP_aes_256_ctr(); + } else if (AlgorithmBlockSize == KEY_LENGTH_128) { + cipher = EVP_aes_128_ctr(); + } else { + LOG(WARNING, "CryptoCodec : Invalid key length."); + return -1; + } + + //calculate new IV when appending a existed file + std::string iv = encryptionInfo->getIv(); + if (stream_offset > 0) { + counter = stream_offset / AlgorithmBlockSize; + padding = stream_offset % AlgorithmBlockSize; + iv = this->calculateIV(iv, counter); + } + + //judge encrypt/decrypt + int enc = 0; + method = crypto_method; + if (method == CryptoMethod::ENCRYPT) + enc = 1; + + // Init cipher context with cipher method + if (!EVP_CipherInit_ex(cipherCtx, cipher, NULL, + (const unsigned char *) key.c_str(), (const unsigned char *) iv.c_str(), + enc)) { + LOG(WARNING, "EVP_CipherInit_ex failed"); + return -1; + } + + //AES/CTR/NoPadding + EVP_CIPHER_CTX_set_padding(cipherCtx, 0); + + LOG(INFO, "CryptoCodec init success, key_length:%llu, is_encode:%d", AlgorithmBlockSize, enc); + is_init = true; + return 1; + } + + std::string CryptoCodec::cipher_wrap(const char * buffer, int64_t size) { + if (!is_init) + THROW(InvalidParameter, "CryptoCodec isn't init"); + + int offset = 0; + int remaining = size; + int len = 0; + int ret = 0; + + std::string in_buf(buffer,size); + std::string out_buf(size, 0); + //set necessary padding when appending a existed file + if (padding > 0) { + in_buf.insert(0, padding, 0); + out_buf.resize(padding+size); + remaining += padding; + } + + // If the encode/decode buffer size larger than crypto buffer size, encode/decode buffer one by one + while (remaining > bufSize) { + ret = EVP_CipherUpdate(cipherCtx, (unsigned char *) &out_buf[offset], &len, + (const unsigned char *)in_buf.data() + offset, bufSize); + + if (!ret) { + std::string err = ERR_lib_error_string(ERR_get_error()); + THROW(HdfsIOException, "CryptoCodec : cipher_wrap AES data failed:%s, crypto_method:%d", err.c_str(), method); + } + offset += len; + remaining -= len; + LOG(DEBUG3, "CryptoCodec : EVP_CipherUpdate successfully, len:%d", len); + } + + if (remaining) { + ret = EVP_CipherUpdate(cipherCtx, (unsigned char *) &out_buf[offset], &len, + (const unsigned char *) in_buf.data() + offset, remaining); + + if (!ret) { + std::string err = ERR_lib_error_string(ERR_get_error()); + THROW(HdfsIOException, "CryptoCodec : cipher_wrap AES data failed:%s, crypto_method:%d", err.c_str(), method); + } + + } + + //cut off padding when necessary + if (padding > 0) { + out_buf.erase(0, padding); + padding = 0; + } + + return out_buf; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/CryptoCodec.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/CryptoCodec.h b/depends/libhdfs3/src/client/CryptoCodec.h index e45599b..cae7d3b 100644 --- a/depends/libhdfs3/src/client/CryptoCodec.h +++ b/depends/libhdfs3/src/client/CryptoCodec.h @@ -35,54 +35,68 @@ namespace Hdfs { -class CryptoCodec { -public: - /** - * Construct a CryptoCodec instance. - * @param encryptionInfo the encryption info of file. - * @param kcp a KmsClientProvider instance to get key from kms server. - * @param bufSize crypto buffer size. - */ - CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize); + enum CryptoMethod { + DECRYPT = 0, + ENCRYPT = 1 + }; - /** - * Destroy a CryptoCodec instance. - */ - virtual ~CryptoCodec(); + class CryptoCodec { + public: + /** + * Construct a CryptoCodec instance. + * @param encryptionInfo the encryption info of file. + * @param kcp a KmsClientProvider instance to get key from kms server. + * @param bufSize crypto buffer size. + */ + CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize); - /** - * Encode buffer. - */ - virtual std::string encode(const char * buffer, int64_t size); + /** + * Destroy a CryptoCodec instance. + */ + virtual ~CryptoCodec(); - /** - * Decode buffer. - */ - virtual std::string decode(const char * buffer, int64_t size); + /** + * encrypt/decrypt(depends on init()) buffer data + * @param buffer + * @param size + * @return encrypt/decrypt result string + */ + virtual std::string cipher_wrap(const char * buffer, int64_t size); -private: + /** + * init CryptoCodec + * @param method CryptoMethod + * @param stream_offset 0 when open a new file; file_lenght when append a existed file + * @return 1 success; 0 no need(already inited); -1 failed + */ + virtual int init(CryptoMethod crypto_method, int64_t stream_offset = 0); - /** - * Common encode/decode buffer method. - * @param buffer the buffer to be encode/decode. - * @param size the size of buffer. - * @param enc true is for encode, false is for decode. - * @return return the encode/decode buffer. - */ - std::string endecInternal(const char *buffer, int64_t size, bool enc); + private: - /** - * Get decrypted key from kms. - */ - std::string getDecryptedKeyFromKms(); + /** + * Get decrypted key from kms. + */ + std::string getDecryptedKeyFromKms(); - shared_ptr<KmsClientProvider> kcp; - FileEncryptionInfo *encryptionInfo; - EVP_CIPHER_CTX *encryptCtx; - EVP_CIPHER_CTX *decryptCtx; - const EVP_CIPHER *cipher; - int32_t bufSize; -}; + /** + * calculate new IV for appending a existed file + * @param initIV + * @param counter + * @return new IV string + */ + std::string calculateIV(const std::string& initIV, unsigned long counter); + + shared_ptr<KmsClientProvider> kcp; + FileEncryptionInfo* encryptionInfo; + EVP_CIPHER_CTX* cipherCtx; + const EVP_CIPHER* cipher; + CryptoMethod method; + + bool is_init; + int32_t bufSize; + int64_t padding; + int64_t counter; + }; } #endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/FileEncryptionInfo.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/FileEncryptionInfo.h b/depends/libhdfs3/src/client/FileEncryptionInfo.h index 32ead6c..7584c02 100644 --- a/depends/libhdfs3/src/client/FileEncryptionInfo.h +++ b/depends/libhdfs3/src/client/FileEncryptionInfo.h @@ -81,8 +81,8 @@ public: } private: - int suite; int cryptoProtocolVersion; + int suite; std::string key; std::string iv; std::string keyName; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/HttpClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/HttpClient.cpp b/depends/libhdfs3/src/client/HttpClient.cpp index 562f599..09a74a6 100644 --- a/depends/libhdfs3/src/client/HttpClient.cpp +++ b/depends/libhdfs3/src/client/HttpClient.cpp @@ -339,6 +339,8 @@ std::string HttpClient::escape(const std::string &data) { } else { LOG(WARNING, "HttpClient : Curl in escape method is NULL"); } + std::string empty; + return empty; } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/OutputStreamImpl.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.cpp b/depends/libhdfs3/src/client/OutputStreamImpl.cpp index 4c5f869..d987295 100644 --- a/depends/libhdfs3/src/client/OutputStreamImpl.cpp +++ b/depends/libhdfs3/src/client/OutputStreamImpl.cpp @@ -255,9 +255,18 @@ void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * FileEncryptionInfo *fileEnInfo = fileStatus.getFileEncryption(); if (fileStatus.isFileEncrypted()) { if (cryptoCodec == NULL) { - auth = shared_ptr<RpcAuth> (new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod()))); - kcp = shared_ptr<KmsClientProvider> (new KmsClientProvider(auth, conf)); - cryptoCodec = shared_ptr<CryptoCodec> (new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize())); + auth = shared_ptr<RpcAuth> ( + new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod()))); + kcp = shared_ptr<KmsClientProvider> ( + new KmsClientProvider(auth, conf)); + cryptoCodec = shared_ptr<CryptoCodec> ( + new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize())); + + int64_t file_length = fileStatus.getLength(); + int ret = cryptoCodec->init(CryptoMethod::ENCRYPT, file_length); + if (ret < 0) { + THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str()); + } } } initAppend(); @@ -278,13 +287,18 @@ void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * if (fileStatus.isFileEncrypted()) { if (cryptoCodec == NULL) { auth = shared_ptr<RpcAuth>( - new RpcAuth(fs->getUserInfo(), - RpcAuth::ParseMethod(conf->getKmsMethod()))); + new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod()))); kcp = shared_ptr<KmsClientProvider>( new KmsClientProvider(auth, conf)); cryptoCodec = shared_ptr<CryptoCodec>( - new CryptoCodec(fileEnInfo, kcp, - conf->getCryptoBufferSize())); + new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize())); + + int64_t file_length = fileStatus.getLength(); + assert(file_length == 0); + int ret = cryptoCodec->init(CryptoMethod::ENCRYPT, file_length); + if (ret < 0) { + THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str()); + } } } closed = false; @@ -317,8 +331,10 @@ void OutputStreamImpl::append(const char * buf, int64_t size) { void OutputStreamImpl::appendInternal(const char * buf, int64_t size) { int64_t todo = size; std::string bufEncode; + if (fileStatus.isFileEncrypted()) { - bufEncode = cryptoCodec->encode(buf, size); + //encrypt buf + bufEncode = cryptoCodec->cipher_wrap(buf, size); buf = bufEncode.c_str(); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/function/TestCInterface.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/function/TestCInterface.cpp b/depends/libhdfs3/test/function/TestCInterface.cpp index 56fe07e..40f6a1b 100644 --- a/depends/libhdfs3/test/function/TestCInterface.cpp +++ b/depends/libhdfs3/test/function/TestCInterface.cpp @@ -134,6 +134,20 @@ static void bufferMD5(const char* strFilePath, int size, char* result) { } } +static void diff_file2buffer(const char *file_path, const char *buf) { + std::cout << "diff file: " << file_path << std::endl; + char resultFile[33] = { 0 }; + char resultBuffer[33] = { 0 }; + + fileMD5(file_path, resultFile); + std::cout << "resultFile is " << resultFile << std::endl; + + bufferMD5(buf, strlen(buf), resultBuffer); + std::cout << "resultBuf is " << resultBuffer << std::endl; + + ASSERT_STREQ(resultFile, resultBuffer); +} + bool CheckFileContent(hdfsFS fs, const char * path, int64_t len, size_t offset) { hdfsFile in = hdfsOpenFile(fs, path, O_RDONLY, 0, 0, 0); @@ -246,7 +260,6 @@ TEST(TestCInterfaceConnect, TestConnect_Success) { TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) { hdfsFS fs = NULL; hdfsEncryptionZoneInfo * enInfo = NULL; - char * uri = NULL; setenv("LIBHDFS3_CONF", "function-test.xml", 1); struct hdfsBuilder * bld = hdfsNewBuilder(); assert(bld != NULL); @@ -256,7 +269,7 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) { system("hadoop fs -rmr /TDE"); system("hadoop key create keytde"); system("hadoop fs -mkdir /TDE"); - ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde")); + ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde")); enInfo = hdfsGetEZForPath(fs, "/TDE"); ASSERT_TRUE(enInfo != NULL); EXPECT_TRUE(enInfo->mKeyName != NULL); @@ -274,11 +287,10 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) { system(tdeKey.c_str()); system(mkTde.c_str()); ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, tde.c_str(), key.c_str())); - } - hdfsEncryptionZoneInfo * enZoneInfos = NULL; + } int num = 0; hdfsListEncryptionZones(fs, &num); - EXPECT_EQ(num, 12); + EXPECT_EQ(num, 12); ASSERT_EQ(hdfsDisconnect(fs), 0); hdfsFreeBuilder(bld); } @@ -286,7 +298,6 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) { TEST(TestCInterfaceTDE, TestAppendWithTDE_Success) { hdfsFS fs = NULL; hdfsEncryptionZoneInfo * enInfo = NULL; - char * uri = NULL; setenv("LIBHDFS3_CONF", "function-test.xml", 1); struct hdfsBuilder * bld = hdfsNewBuilder(); assert(bld != NULL); @@ -327,7 +338,6 @@ TEST(TestCInterfaceTDE, TestAppendWithTDE_Success) { TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) { hdfsFS fs = NULL; hdfsEncryptionZoneInfo * enInfo = NULL; - char * uri = NULL; setenv("LIBHDFS3_CONF", "function-test.xml", 1); struct hdfsBuilder * bld = hdfsNewBuilder(); assert(bld != NULL); @@ -348,6 +358,7 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) { const char *tdefile = "/TDE/testfile"; ASSERT_TRUE(CreateFile(fs, tdefile, 0, 0)); + //case1: append int size = 1024 * 32; size_t offset = 0; hdfsFile out; @@ -371,15 +382,10 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) { } while (0); system("rm -rf ./testfile"); system("hadoop fs -get /TDE/testfile ./"); - char resultFile[33] = { 0 }; - fileMD5("./testfile", resultFile); - std::cout << "resultFile is " << resultFile << std::endl; - char resultBuffer[33] = { 0 }; - LOG(INFO, "buffer is %s", &buffer[0]); - bufferMD5(&buffer[0], size, resultBuffer); - std::cout << "result is " << resultBuffer << std::endl; - ASSERT_STREQ(resultFile, resultBuffer); + diff_file2buffer("testfile", &buffer[0]); system("rm ./testfile"); + + //case5: a large file (> 64M) TODO system("hadoop fs -rmr /TDE"); system("hadoop key delete keytde4append -f"); ASSERT_EQ(hdfsDisconnect(fs), 0); @@ -387,6 +393,106 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) { } +TEST(TestCInterfaceTDE, TestAppendMultiTimes_Success) { + hdfsFS fs = NULL; + hdfsEncryptionZoneInfo * enInfo = NULL; + setenv("LIBHDFS3_CONF", "function-test.xml", 1); + struct hdfsBuilder * bld = hdfsNewBuilder(); + assert(bld != NULL); + hdfsBuilderSetNameNode(bld, "default"); + fs = hdfsBuilderConnect(bld); + ASSERT_TRUE(fs != NULL); + + //creake iey and encryption zone + system("hadoop fs -rmr /TDE"); + system("hadoop key delete keytde4append -f"); + system("hadoop key create keytde4append"); + system("hadoop fs -mkdir /TDE"); + ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde4append")); + enInfo = hdfsGetEZForPath(fs, "/TDE"); + ASSERT_TRUE(enInfo != NULL); + EXPECT_TRUE(enInfo->mKeyName != NULL); + hdfsFreeEncryptionZoneInfo(enInfo, 1); + + hdfsFile out; + //case2: close and append + const char *tdefile2 = "/TDE/testfile2"; + char out_data2[] = "12345678"; + ASSERT_TRUE(CreateFile(fs, tdefile2, 0, 0)); + out = hdfsOpenFile(fs, tdefile2, O_WRONLY | O_APPEND, 0, 0, 0); + hdfsWrite(fs, out, out_data2, 4); + hdfsCloseFile(fs, out); + + out = hdfsOpenFile(fs, tdefile2, O_WRONLY | O_APPEND, 0, 0, 0); + hdfsWrite(fs, out, out_data2+4, 4); + hdfsCloseFile(fs, out); + system("rm ./testfile2"); + system("hadoop fs -get /TDE/testfile2 ./"); + diff_file2buffer("testfile2", out_data2); + + //case3: multi-append + const char *tdefile3 = "/TDE/testfile3"; + char out_data3[] = "1234567812345678123456781234567812345678123456781234567812345678"; //16*4byte + ASSERT_TRUE(CreateFile(fs, tdefile3, 0, 0)); + out = hdfsOpenFile(fs, tdefile3, O_WRONLY | O_APPEND, 0, 0, 0); + hdfsWrite(fs, out, out_data3, 5); + hdfsWrite(fs, out, out_data3+5, 28); + hdfsWrite(fs, out, out_data3+33, 15); + hdfsWrite(fs, out, out_data3+48, 16); + hdfsCloseFile(fs, out); + system("rm ./testfile3"); + system("hadoop fs -get /TDE/testfile3 ./"); + diff_file2buffer("testfile3", out_data3); + + + //case4: multi-append > bufsize(8k) + const char *tdefile4 = "/TDE/testfile4"; + int data_size = 13*1024+1; + char *out_data4 = (char *)malloc(data_size); + Hdfs::FillBuffer(out_data4, data_size-1, 1024); + out_data4[data_size-1] = 0; + ASSERT_TRUE(CreateFile(fs, tdefile4, 0, 0)); + out = hdfsOpenFile(fs, tdefile4, O_WRONLY | O_APPEND, 0, 0, 0); + + int todo = 0; + int offset = 0; + todo = 9*1024-1; + while (todo > 0) { + int rc = 0; + if (0 > (rc = hdfsWrite(fs, out, out_data4+offset, todo))) { + break; + } + todo -= rc; + offset += rc; + } + + todo = 4*1024+1; + while (todo > 0) { + int rc = 0; + if (0 > (rc = hdfsWrite(fs, out, out_data4+offset, todo))) { + break; + } + todo -= rc; + offset += rc; + } + + + ASSERT_EQ(data_size-1, offset); + + hdfsCloseFile(fs, out); + system("rm ./testfile4"); + system("hadoop fs -get /TDE/testfile4 ./"); + diff_file2buffer("testfile4", out_data4); + free(out_data4); + + + + system("hadoop fs -rmr /TDE"); + system("hadoop key delete keytde4append -f"); + ASSERT_EQ(hdfsDisconnect(fs), 0); + hdfsFreeBuilder(bld); +} + TEST(TestErrorMessage, TestErrorMessage) { EXPECT_NO_THROW(hdfsGetLastError()); hdfsChown(NULL, TEST_HDFS_PREFIX, NULL, NULL); @@ -1773,3 +1879,27 @@ TEST_F(TestCInterface, TestGetHosts_Success) { hdfsFreeHosts(hosts); hdfsCloseFile(fs, out); } + +// test concurrent write to a same file +// expected: +// At any point there can only be 1 writer. +// This is enforced by requiring the writer to acquire leases. +TEST_F(TestCInterface, TestConcurrentWrite_Failure) { + hdfsFS fs = NULL; + setenv("LIBHDFS3_CONF", "function-test.xml", 1); + struct hdfsBuilder * bld = hdfsNewBuilder(); + assert(bld != NULL); + hdfsBuilderSetNameNode(bld, "default"); + fs = hdfsBuilderConnect(bld); + ASSERT_TRUE(fs != NULL); + + const char *file_path = BASE_DIR "/concurrent_write"; + char buf[] = "1234"; + hdfsFile fout1 = hdfsOpenFile(fs, file_path, O_WRONLY | O_APPEND, 0, 0, 0); + hdfsFile fout2 = hdfsOpenFile(fs, file_path, O_WRONLY | O_APPEND, 0, 0, 0); + ASSERT_TRUE(fout2 == NULL); //must failed + int rc = hdfsWrite(fs, fout1, buf, sizeof(buf)-1); + ASSERT_TRUE(rc > 0); + int retval = hdfsCloseFile(fs, fout1); + ASSERT_TRUE(retval == 0); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/function/TestKmsClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/function/TestKmsClient.cpp b/depends/libhdfs3/test/function/TestKmsClient.cpp index d997f88..21280de 100644 --- a/depends/libhdfs3/test/function/TestKmsClient.cpp +++ b/depends/libhdfs3/test/function/TestKmsClient.cpp @@ -186,7 +186,6 @@ TEST_F(TestKmsClient, DecryptEncryptedKeySuccess) { TEST_F(TestKmsClient, CreateKeyFailediBadUrl) { std::string keyName = "testcreatekeyfailname"; std::string cipher = "AES/CTR/NoPadding"; - int length = 128; std::string material = "testCreateKey"; std::string url[4] = { "ftp:///http@localhost:16000/kms", http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp b/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp index 36c67b1..92e9403 100644 --- a/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp +++ b/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp @@ -111,7 +111,8 @@ TEST_F(TestCryptoCodec, encode_Success) { //char buf[1024] = "encode hello world"; char buf[1024]; - Hdfs::FillBuffer(buf, sizeof(buf), 2048); + Hdfs::FillBuffer(buf, sizeof(buf)-1, 2048); + buf[sizeof(buf)-1] = 0; int32_t bufSize = 1024; @@ -121,13 +122,20 @@ TEST_F(TestCryptoCodec, encode_Success) { encryptionInfo.setKey(Key[i]); shared_ptr<MockHttpClient> hc(new MockHttpClient()); kcp->setHttpClient(hc); - CryptoCodec es(&encryptionInfo, kcp, bufSize); + EXPECT_CALL(*kcp, decryptEncryptedKey(_)).Times(2).WillRepeatedly( Return(kcp->getEDKResult(encryptionInfo))); - std::string encodeStr = es.encode(buf, strlen(buf)); + + CryptoCodec es(&encryptionInfo, kcp, bufSize); + es.init(CryptoMethod::ENCRYPT); + CryptoCodec ds(&encryptionInfo, kcp, bufSize); + ds.init(CryptoMethod::DECRYPT); + + + std::string encodeStr = es.cipher_wrap(buf, strlen(buf)); ASSERT_NE(0, memcmp(buf, encodeStr.c_str(), strlen(buf))); - std::string decodeStr = es.decode(encodeStr.c_str(), strlen(buf)); + std::string decodeStr = ds.cipher_wrap(encodeStr.c_str(), strlen(buf)); ASSERT_STREQ(decodeStr.c_str(), buf); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp b/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp index b8b6a46..de36eac 100644 --- a/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp +++ b/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp @@ -406,7 +406,7 @@ TEST_F(TestOutputStream, appendEncryption_Success) { EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock)); EXPECT_CALL(*fs, fsync(_)).Times(2); std::string bufferEn; - EXPECT_CALL(*cryptoC, encode(_,_)).Times(1).WillOnce(Return(bufferEn)); + EXPECT_CALL(*cryptoC, cipher_wrap(_,_)).Times(1).WillOnce(Return(bufferEn)); EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer))); EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock)); EXPECT_CALL(*fs, fsync(_)).Times(1);
