CurtHagenlocher commented on code in PR #2766: URL: https://github.com/apache/arrow-adbc/pull/2766#discussion_r2072006269
########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +590,137 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + + // 2. Read all batches into memory + RecordBatch? columnsBatch = null; + RecordBatch? pkBatch = null; + RecordBatch? fkBatch = null; + + // Extract column data + using (var stream = columnsResult.Stream) + { + columnsBatch = await stream.ReadNextRecordBatchAsync(cancellationToken); Review Comment: So we're guaranteed, then, that all the results will be in a single `RecordBatch`? Consider adding a comment to that effect. ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +590,137 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + + // 2. Read all batches into memory + RecordBatch? columnsBatch = null; + RecordBatch? pkBatch = null; + RecordBatch? fkBatch = null; + + // Extract column data + using (var stream = columnsResult.Stream) + { + columnsBatch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (columnsBatch == null) return columnsResult; + } + + // 3. Find column name index + var colNameIndex = columnsResult.Stream.Schema.GetFieldIndex("COLUMN_NAME"); + if (colNameIndex < 0) return columnsResult; // Can't match without column names + + // Get column names + var colNames = (StringArray)columnsBatch.Column(colNameIndex); + + // 4. Create combined schema and prepare data + var allFields = new List<Field>(columnsResult.Stream.Schema.FieldsList); + var combinedData = new List<IArrowArray>(); + + // Add all columns data + for (int i = 0; i < columnsBatch.ColumnCount; i++) + { + combinedData.Add(columnsBatch.Column(i)); + } + + // 5. Process PK and FK data using helper methods with selected fields + ProcessRelationshipData(pkResult, "PK_", "COLUMN_NAME", + new[] { "COLUMN_NAME", "KEY_SEQ" }, // Selected PK fields + ref pkBatch, colNames, columnsBatch.Length, + ref allFields, combinedData, cancellationToken); + + ProcessRelationshipData(fkResult, "FK_", "FKCOLUMN_NAME", + new[] { "PKCOLUMN_NAME", "PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "FKCOLUMN_NAME" }, // Selected FK fields + ref fkBatch, colNames, columnsBatch.Length, + ref allFields, combinedData, cancellationToken); + + // 6. Return the combined result + var combinedSchema = new Schema(allFields, columnsResult.Stream.Schema.Metadata); + return new QueryResult(columnsBatch.Length, new HiveServer2Connection.HiveInfoArrowStream(combinedSchema, combinedData)); + } + + // Helper method to process relationship data (PK or FK) with selected fields + private void ProcessRelationshipData(QueryResult result, string prefix, string columnNameField, + string[] includeFields, ref RecordBatch? batch, StringArray colNames, int rowCount, + ref List<Field> allFields, List<IArrowArray> combinedData, CancellationToken cancellationToken) Review Comment: `allFields` does not need to be passed as a `ref` argument because it's not being reassigned. `List<>` is a reference type so mutations happen in-place. ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +590,137 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + + // 2. Read all batches into memory + RecordBatch? columnsBatch = null; + RecordBatch? pkBatch = null; + RecordBatch? fkBatch = null; + + // Extract column data + using (var stream = columnsResult.Stream) + { + columnsBatch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (columnsBatch == null) return columnsResult; + } + + // 3. Find column name index + var colNameIndex = columnsResult.Stream.Schema.GetFieldIndex("COLUMN_NAME"); + if (colNameIndex < 0) return columnsResult; // Can't match without column names + + // Get column names + var colNames = (StringArray)columnsBatch.Column(colNameIndex); + + // 4. Create combined schema and prepare data + var allFields = new List<Field>(columnsResult.Stream.Schema.FieldsList); + var combinedData = new List<IArrowArray>(); + + // Add all columns data + for (int i = 0; i < columnsBatch.ColumnCount; i++) + { + combinedData.Add(columnsBatch.Column(i)); + } + + // 5. Process PK and FK data using helper methods with selected fields + ProcessRelationshipData(pkResult, "PK_", "COLUMN_NAME", + new[] { "COLUMN_NAME", "KEY_SEQ" }, // Selected PK fields + ref pkBatch, colNames, columnsBatch.Length, + ref allFields, combinedData, cancellationToken); + + ProcessRelationshipData(fkResult, "FK_", "FKCOLUMN_NAME", + new[] { "PKCOLUMN_NAME", "PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "FKCOLUMN_NAME" }, // Selected FK fields + ref fkBatch, colNames, columnsBatch.Length, + ref allFields, combinedData, cancellationToken); + + // 6. Return the combined result + var combinedSchema = new Schema(allFields, columnsResult.Stream.Schema.Metadata); + return new QueryResult(columnsBatch.Length, new HiveServer2Connection.HiveInfoArrowStream(combinedSchema, combinedData)); + } + + // Helper method to process relationship data (PK or FK) with selected fields + private void ProcessRelationshipData(QueryResult result, string prefix, string columnNameField, + string[] includeFields, ref RecordBatch? batch, StringArray colNames, int rowCount, + ref List<Field> allFields, List<IArrowArray> combinedData, CancellationToken cancellationToken) + { + if (result.Stream == null) return; + + // Build column map and read batch + Dictionary<string, int> map = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase); + using (var stream = result.Stream) + { + var colIndex = result.Stream.Schema.GetFieldIndex(columnNameField); + if (colIndex >= 0) + { + batch = stream.ReadNextRecordBatchAsync(cancellationToken).Result; Review Comment: (same note about the result being guaranteed to contain only a single batch) ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +590,137 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + + // 2. Read all batches into memory + RecordBatch? columnsBatch = null; + RecordBatch? pkBatch = null; + RecordBatch? fkBatch = null; + + // Extract column data + using (var stream = columnsResult.Stream) + { + columnsBatch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (columnsBatch == null) return columnsResult; + } + + // 3. Find column name index + var colNameIndex = columnsResult.Stream.Schema.GetFieldIndex("COLUMN_NAME"); + if (colNameIndex < 0) return columnsResult; // Can't match without column names + + // Get column names + var colNames = (StringArray)columnsBatch.Column(colNameIndex); + + // 4. Create combined schema and prepare data + var allFields = new List<Field>(columnsResult.Stream.Schema.FieldsList); + var combinedData = new List<IArrowArray>(); + + // Add all columns data + for (int i = 0; i < columnsBatch.ColumnCount; i++) + { + combinedData.Add(columnsBatch.Column(i)); + } + + // 5. Process PK and FK data using helper methods with selected fields + ProcessRelationshipData(pkResult, "PK_", "COLUMN_NAME", + new[] { "COLUMN_NAME", "KEY_SEQ" }, // Selected PK fields + ref pkBatch, colNames, columnsBatch.Length, + ref allFields, combinedData, cancellationToken); + + ProcessRelationshipData(fkResult, "FK_", "FKCOLUMN_NAME", + new[] { "PKCOLUMN_NAME", "PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "FKCOLUMN_NAME" }, // Selected FK fields + ref fkBatch, colNames, columnsBatch.Length, + ref allFields, combinedData, cancellationToken); + + // 6. Return the combined result + var combinedSchema = new Schema(allFields, columnsResult.Stream.Schema.Metadata); + return new QueryResult(columnsBatch.Length, new HiveServer2Connection.HiveInfoArrowStream(combinedSchema, combinedData)); + } + + // Helper method to process relationship data (PK or FK) with selected fields + private void ProcessRelationshipData(QueryResult result, string prefix, string columnNameField, + string[] includeFields, ref RecordBatch? batch, StringArray colNames, int rowCount, + ref List<Field> allFields, List<IArrowArray> combinedData, CancellationToken cancellationToken) + { + if (result.Stream == null) return; + + // Build column map and read batch + Dictionary<string, int> map = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase); Review Comment: Column names will always be case-insensitive? There's no server-side setting for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org