kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1602143533
Ah, should we remove the following checks too?
We can read multiple times from endpoints that have expiration time:
```diff
diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc
b/cpp/src/arrow/flight/integration_tests/test_integration.cc
index f253f8db7..b8a75ad6d 100644
--- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
+++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
@@ -617,26 +617,20 @@ class ExpirationTimeDoGetScenario : public Scenario {
ARROW_ASSIGN_OR_RAISE(
auto info,
client->GetFlightInfo(FlightDescriptor::Command("expiration_time")));
std::vector<std::shared_ptr<arrow::Table>> tables;
- // First read from all endpoints
for (const auto& endpoint : info->endpoints()) {
ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
- tables.push_back(table);
- }
- // Re-reads only from endpoints that have expiration time
- for (const auto& endpoint : info->endpoints()) {
- if (endpoint.expiration_time.has_value()) {
- ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
- ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
- tables.push_back(table);
+ if (tables.size() == 0) {
+ if (endpoint.expiration_time.has_value()) {
+ return Status::Invalid("The first endpoint must not have
expiration time");
+ }
} else {
- auto reader = client->DoGet(endpoint.ticket);
- if (reader.ok()) {
- return Status::Invalid(
- "Data that doesn't have expiration time "
- "shouldn't be readable multiple times");
+ if (!endpoint.expiration_time.has_value()) {
+ return Status::Invalid("The ", tables.size(),
+ "-th endpoint must have expiration time");
}
}
+ tables.push_back(table);
}
ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));
@@ -645,13 +639,9 @@ class ExpirationTimeDoGetScenario : public Scenario {
ARROW_ASSIGN_OR_RAISE(auto builder,
RecordBatchBuilder::Make(schema,
arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
- // First reads
ARROW_RETURN_NOT_OK(number_builder->Append(0));
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
- // Re-reads only from endpoints that have expiration time
- ARROW_RETURN_NOT_OK(number_builder->Append(1));
- ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
expected_record_batch};
```
We can read from refreshed endpoints:
```diff
diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc
b/cpp/src/arrow/flight/integration_tests/test_integration.cc
index f253f8db7..5f212ba2d 100644
--- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
+++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
@@ -779,13 +779,6 @@ class ExpirationTimeRefreshFlightEndpointScenario :
public Scenario {
Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("expiration")));
- std::vector<std::shared_ptr<arrow::Table>> tables;
- // First read from all endpoints
- for (const auto& endpoint : info->endpoints()) {
- ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
- ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
- tables.push_back(table);
- }
// Refresh all endpoints that have expiration time
std::vector<FlightEndpoint> refreshed_endpoints;
Timestamp max_expiration_time;
@@ -811,56 +804,7 @@ class ExpirationTimeRefreshFlightEndpointScenario :
public Scenario {
}
refreshed_endpoints.push_back(std::move(refreshed_endpoint));
}
- // Expire all not refreshed endpoints
- {
- std::vector<Timestamp> refreshed_expiration_times;
- for (const auto& endpoint : refreshed_endpoints) {
-
refreshed_expiration_times.push_back(endpoint.expiration_time.value());
- }
- std::sort(refreshed_expiration_times.begin(),
refreshed_expiration_times.end());
- if (refreshed_expiration_times[0] < max_expiration_time) {
- return Status::Invalid(
- "One or more refreshed expiration time "
- "are shorter than original expiration time\n",
- "Original: ", max_expiration_time.time_since_epoch().count(),
"\n",
- "Refreshed: ",
refreshed_expiration_times[0].time_since_epoch().count(),
- "\n");
- }
- if (max_expiration_time > Timestamp::clock::now()) {
- std::this_thread::sleep_for(max_expiration_time -
Timestamp::clock::now());
- }
- }
- // Re-reads only from refreshed endpoints
- for (const auto& endpoint : refreshed_endpoints) {
- ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
- ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
- tables.push_back(table);
- }
- ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));
- // Build expected table
- auto schema = arrow::schema({arrow::field("number", arrow::uint32(),
false)});
- ARROW_ASSIGN_OR_RAISE(auto builder,
- RecordBatchBuilder::Make(schema,
arrow::default_memory_pool()));
- auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
- // First reads
- ARROW_RETURN_NOT_OK(number_builder->Append(0));
- ARROW_RETURN_NOT_OK(number_builder->Append(1));
- ARROW_RETURN_NOT_OK(number_builder->Append(2));
- // Re-reads only from refreshed endpoints
- ARROW_RETURN_NOT_OK(number_builder->Append(1));
- ARROW_RETURN_NOT_OK(number_builder->Append(2));
- ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
- std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
- expected_record_batch};
- ARROW_ASSIGN_OR_RAISE(auto expected_table,
-
Table::FromRecordBatches(expected_record_batches));
-
- // Check read data
- if (!table->Equals(*expected_table)) {
- return Status::Invalid("Read data isn't expected\n", "Expected:\n",
- expected_table->ToString(), "Actual:\n",
table->ToString());
- }
return Status::OK();
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]