jja725 commented on code in PR #1541:
URL: 
https://github.com/apache/datafusion-ballista/pull/1541#discussion_r3037801780


##########
ballista/riffle/tests/integration_test.rs:
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration test for the Riffle client against a local Riffle cluster.
+//!
+//! Requires a running Riffle cluster:
+//! - Uniffle coordinator on localhost:21000
+//! - Riffle shuffle server on localhost:21100
+//!
+//! Run with: cargo test -p ballista-riffle --test integration_test
+
+use ballista_riffle::client::RiffleClient;
+use ballista_riffle::config::RiffleConfig;
+use ballista_riffle::serde::{ipc_bytes_to_record_batches, 
record_batches_to_ipc_bytes};
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+use std::sync::Arc;
+
+fn test_schema() -> Arc<Schema> {
+    Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, false),
+    ]))
+}
+
+fn test_batches(schema: &Arc<Schema>) -> Vec<RecordBatch> {
+    vec![
+        RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(StringArray::from(vec!["alice", "bob", "carol"])),
+            ],
+        )
+        .unwrap(),
+        RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![4, 5])),
+                Arc::new(StringArray::from(vec!["dave", "eve"])),
+            ],
+        )
+        .unwrap(),
+    ]
+}
+
+#[tokio::test]
+#[ignore = "requires a running Riffle cluster (coordinator on localhost:21000, 
server on localhost:21100)"]

Review Comment:
   Done — moved to the external extension repo with testcontainers support.



##########
ballista/core/proto/ballista.proto:
##########
@@ -458,15 +454,15 @@ message TaskKilled {
 
 message ShuffleWritePartition {
   uint64 partition_id = 1;
+  string path = 2;
   uint64 num_batches = 3;
   uint64 num_rows = 4;
   uint64 num_bytes = 5;
-  optional uint64 file_id = 6;
-  bool is_sort_shuffle = 7;
-
-  // reserved after removing path
-  reserved 2;
-  reserved "path";
+  // Remote shuffle service fields (empty when using local shuffle).
+  string remote_shuffle_app_id = 6;
+  int32 remote_shuffle_id = 7;
+  string remote_shuffle_server_host = 8;

Review Comment:
   Removed all proto fields. The extension encodes server info in the `path` 
field as `riffle://host:port/app_id/shuffle_id/partition_id`. No Ballista proto 
changes needed.



##########
ballista/core/proto/ballista.proto:
##########
@@ -458,15 +454,15 @@ message TaskKilled {
 
 message ShuffleWritePartition {
   uint64 partition_id = 1;
+  string path = 2;
   uint64 num_batches = 3;
   uint64 num_rows = 4;
   uint64 num_bytes = 5;
-  optional uint64 file_id = 6;
-  bool is_sort_shuffle = 7;
-
-  // reserved after removing path
-  reserved 2;
-  reserved "path";
+  // Remote shuffle service fields (empty when using local shuffle).
+  string remote_shuffle_app_id = 6;

Review Comment:
   Removed. Maps to `PartitionId.job_id`, no new field needed.



##########
ballista/core/proto/ballista.proto:
##########
@@ -458,15 +454,15 @@ message TaskKilled {
 
 message ShuffleWritePartition {
   uint64 partition_id = 1;
+  string path = 2;
   uint64 num_batches = 3;
   uint64 num_rows = 4;
   uint64 num_bytes = 5;
-  optional uint64 file_id = 6;
-  bool is_sort_shuffle = 7;
-
-  // reserved after removing path
-  reserved 2;
-  reserved "path";
+  // Remote shuffle service fields (empty when using local shuffle).
+  string remote_shuffle_app_id = 6;
+  int32 remote_shuffle_id = 7;

Review Comment:
   Removed. Maps to `PartitionId.stage_id`.



##########
ballista/core/proto/ballista.proto:
##########
@@ -458,15 +454,15 @@ message TaskKilled {
 
 message ShuffleWritePartition {
   uint64 partition_id = 1;
+  string path = 2;
   uint64 num_batches = 3;
   uint64 num_rows = 4;
   uint64 num_bytes = 5;
-  optional uint64 file_id = 6;
-  bool is_sort_shuffle = 7;
-
-  // reserved after removing path
-  reserved 2;
-  reserved "path";
+  // Remote shuffle service fields (empty when using local shuffle).
+  string remote_shuffle_app_id = 6;
+  int32 remote_shuffle_id = 7;

Review Comment:
   Yes — addressed above.



##########
ballista/core/proto/ballista.proto:
##########
@@ -458,15 +454,15 @@ message TaskKilled {
 
 message ShuffleWritePartition {
   uint64 partition_id = 1;
+  string path = 2;
   uint64 num_batches = 3;
   uint64 num_rows = 4;
   uint64 num_bytes = 5;
-  optional uint64 file_id = 6;
-  bool is_sort_shuffle = 7;
-
-  // reserved after removing path
-  reserved 2;
-  reserved "path";
+  // Remote shuffle service fields (empty when using local shuffle).
+  string remote_shuffle_app_id = 6;

Review Comment:
   Yes — addressed above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to