marin-ma commented on code in PR #5675:
URL: https://github.com/apache/incubator-gluten/pull/5675#discussion_r1604269082
##########
cpp/core/shuffle/HashPartitioner.cc:
##########
@@ -16,9 +16,28 @@
*/
#include "shuffle/HashPartitioner.h"
+#include <iostream>
Review Comment:
Please remove.
##########
cpp/core/shuffle/Options.h:
##########
@@ -56,6 +63,7 @@ struct PartitionWriterOptions {
int32_t compressionThreshold = kDefaultCompressionThreshold;
arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
+ std::string compressionTypeStr = "lz4";
Review Comment:
`kDefaultCompressionTypeStr`
##########
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "velox/common/time/CpuWallTimer.h"
+#include "velox/serializers/PrestoSerializer.h"
+#include "velox/type/Type.h"
+#include "velox/vector/ComplexVector.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/VectorStream.h"
+
+#include <arrow/array/util.h>
+#include <arrow/ipc/writer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/record_batch.h>
+#include <arrow/result.h>
+#include <arrow/type.h>
+
+#include "VeloxShuffleWriter.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/PartitionWriter.h"
+#include "shuffle/Partitioner.h"
+#include "shuffle/Utils.h"
+
+#include "utils/Print.h"
+
+namespace gluten {
+
+// set 1 to open print
+#define VELOX_SHUFFLE_WRITER_PRINT 0
+
+#if VELOX_SHUFFLE_WRITER_PRINT
+
+#define VsPrint Print
+#define VsPrintLF PrintLF
+#define VsPrintSplit PrintSplit
+#define VsPrintSplitLF PrintSplitLF
+#define VsPrintVectorRange PrintVectorRange
+#define VS_PRINT PRINT
+#define VS_PRINTLF PRINTLF
+#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME
+#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE
+#define VS_PRINT_CONTAINER PRINT_CONTAINER
+#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING
+#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING
+#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING
+#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING
+#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING
+
+#else // VELOX_SHUFFLE_WRITER_PRINT
+
+#define VsPrint(...) // NOLINT
+#define VsPrintLF(...) // NOLINT
+#define VsPrintSplit(...) // NOLINT
+#define VsPrintSplitLF(...) // NOLINT
+#define VsPrintVectorRange(...) // NOLINT
+#define VS_PRINT(a)
+#define VS_PRINTLF(a)
+#define VS_PRINT_FUNCTION_NAME()
+#define VS_PRINT_FUNCTION_SPLIT_LINE()
+#define VS_PRINT_CONTAINER(c)
+#define VS_PRINT_CONTAINER_TO_STRING(c)
+#define VS_PRINT_CONTAINER_2_STRING(c)
+#define VS_PRINT_VECTOR_TO_STRING(v)
+#define VS_PRINT_VECTOR_2_STRING(v)
+#define VS_PRINT_VECTOR_MAPPING(v)
+
+#endif // end of VELOX_SHUFFLE_WRITER_PRINT
Review Comment:
Are these macros used by the sort shuffle writer? If not, please remove.
##########
cpp/core/shuffle/LocalPartitionWriter.cc:
##########
@@ -541,6 +541,10 @@ arrow::Status LocalPartitionWriter::evict(
return arrow::Status::OK();
}
+arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t
rawSize, const char* data, int64_t length) {
+ return arrow::Status::OK();
Review Comment:
Return `arrow::Status::NotImplemented("Invalid code path for local shuffle
writer: sort based is not supported.");`
##########
cpp/velox/memory/VeloxColumnarBatch.h:
##########
@@ -46,7 +46,7 @@ class VeloxColumnarBatch final : public ColumnarBatch {
facebook::velox::RowVectorPtr getRowVector() const;
facebook::velox::RowVectorPtr getFlattenedRowVector();
- private:
+ public:
Review Comment:
Unintended change?
##########
cpp/core/shuffle/Options.h:
##########
@@ -25,29 +25,36 @@
namespace gluten {
static constexpr int16_t kDefaultBatchSize = 4096;
-static constexpr int16_t kDefaultShuffleWriterBufferSize = 4096;
+static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
+static constexpr int64_t kDefaultSortBufferThreshold = 64000000000;
+static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;
+enum ShuffleWriterType { kHashShuffle, kSortShuffle };
enum PartitionWriterType { kLocal, kRss };
struct ShuffleReaderOptions {
arrow::Compression::type compressionType =
arrow::Compression::type::LZ4_FRAME;
+ std::string compressionTypeStr = "lz4";
Review Comment:
Please add `static const std::string kDefaultCompressionTypeStr = "lz4";` in
above code and use `kDefaultCompressionTypeStr` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]