CurtHagenlocher commented on code in PR #2766:
URL: https://github.com/apache/arrow-adbc/pull/2766#discussion_r2077512196
##########
csharp/test/Drivers/Databricks/StatementTests.cs:
##########
@@ -122,6 +122,7 @@ public async Task
CanGetCrossReferenceFromChildTableDatabricks()
public async Task CanGetColumnsWithBaseTypeName()
{
var statement = Connection.CreateStatement();
+ var res = Connection.GetInfo([AdbcInfoCode.VendorName,
AdbcInfoCode.VendorVersion]);
Review Comment:
Is this just to validate that getting the vendor name and version doesn't
raise an exception?
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -587,5 +592,370 @@ protected internal QueryResult
EnhanceGetColumnsResult(Schema originalSchema, IR
return new QueryResult(rowCount, new
HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData));
}
+
+ // Helper method to read all batches from a stream
+ private async Task<(List<RecordBatch> Batches, Schema Schema, int
TotalRows)> ReadAllBatchesAsync(
+ IArrowArrayStream stream, CancellationToken cancellationToken)
+ {
+ List<RecordBatch> batches = new List<RecordBatch>();
+ int totalRows = 0;
+ Schema schema = stream.Schema;
+
+ // Read all batches
+ while (true)
+ {
+ var batch = await
stream.ReadNextRecordBatchAsync(cancellationToken);
+ if (batch == null) break;
+
+ if (batch.Length > 0)
+ {
+ batches.Add(batch);
+ totalRows += batch.Length;
+ }
+ else
+ {
+ batch.Dispose();
+ }
+ }
+
+ return (batches, schema, totalRows);
+ }
+
+ // Helper method to create empty string columns filled with nulls
+ private List<IArrowArray> CreateEmptyStringColumns(string[]
fieldNames, int rowCount)
+ {
+ var result = new List<IArrowArray>();
+
+ foreach (var fieldName in fieldNames)
+ {
+ var builder = new StringArray.Builder();
+ for (int i = 0; i < rowCount; i++)
+ {
+ builder.AppendNull();
+ }
+ result.Add(builder.Build());
+ }
+
+ return result;
+ }
+
+ // Helper class to manage builder creation and value appending
+ // This is only for metadata query result so we only have int or
string types
+ private class TypedBuilder
+ {
+ private readonly IArrowArrayBuilder _builder;
+ private readonly ArrowTypeId _typeId;
+
+ public TypedBuilder(ArrowTypeId typeId)
+ {
+ _typeId = typeId;
+ _builder = CreateBuilder(typeId);
+ }
+
+ private static IArrowArrayBuilder CreateBuilder(ArrowTypeId
typeId) => typeId switch
+ {
+ ArrowTypeId.Int16 => new Int16Array.Builder(),
+ ArrowTypeId.Int32 => new Int32Array.Builder(),
+ _ => new StringArray.Builder() // Default to string for
unsupported types
+ };
+
+ public void AppendNull()
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+ ((Int16Array.Builder)_builder).AppendNull();
+ break;
+ case ArrowTypeId.Int32:
+ ((Int32Array.Builder)_builder).AppendNull();
+ break;
+ default:
+ ((StringArray.Builder)_builder).AppendNull();
+ break;
+ }
+ }
+
+ public IArrowArray Build() => _typeId switch
+ {
+ ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(),
+ ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(),
+ _ => ((StringArray.Builder)_builder).Build()
+ };
+
+ public void AppendValue(IArrowArray columnArray, int rowIndex)
+ {
+ try
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+
((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ case ArrowTypeId.Int32:
+
((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ default: // Handles String and other types that
default to StringArray.Builder
+ // Try to cast to StringArray and get string value
- if it fails,
+ // the outer try-catch will handle it and call
AppendNull()
+
((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex));
+ break;
+ }
+ }
+ catch
+ {
+ // If any conversion fails or columnArray is null, append
null as fallback
+ AppendNull();
+ }
+ }
+
+
+ }
+
+ private async Task<QueryResult>
GetColumnsExtendedAsync(CancellationToken cancellationToken = default)
+ {
+ // 1. Get all three results at once
+ var columnsResult = await GetColumnsAsync(cancellationToken);
+ if (columnsResult.Stream == null) return columnsResult;
+
+ var pkResult = await GetPrimaryKeysAsync(cancellationToken);
+
+ // For FK lookup, we need to pass in the current
catalog/schema/table as the foreign table
+ var resp = await Connection.GetCrossReferenceAsync(
+ null,
+ null,
+ null,
+ CatalogName,
+ SchemaName,
+ TableName,
+ cancellationToken);
+
+ var fkResult = await GetQueryResult(resp.DirectResults,
cancellationToken);
+
+ // 2. Read all batches into memory
+ List<RecordBatch> columnsBatches;
+ int totalRows;
+ Schema columnsSchema;
+ StringArray? columnNames = null;
+ int colNameIndex = -1;
+
+ // Extract column data
+ using (var stream = columnsResult.Stream)
+ {
+ colNameIndex = stream.Schema.GetFieldIndex("COLUMN_NAME");
+ if (colNameIndex < 0) return columnsResult; // Can't match
without column names
+
+ var batchResult = await ReadAllBatchesAsync(stream,
cancellationToken);
+ columnsBatches = batchResult.Batches;
+ columnsSchema = batchResult.Schema;
+ totalRows = batchResult.TotalRows;
+
+ if (columnsBatches.Count == 0) return columnsResult;
+
+ // Create column names array from all batches
+ var builder = new StringArray.Builder();
+ foreach (var batch in columnsBatches)
+ {
+ StringArray batchColNames =
(StringArray)batch.Column(colNameIndex);
+ for (int i = 0; i < batch.Length; i++)
+ {
+ builder.Append(batchColNames.GetString(i));
+ }
+ }
+ columnNames = builder.Build();
+ }
+
+ // 3. Create combined schema and prepare data
+ var allFields = new List<Field>(columnsSchema.FieldsList);
+ var combinedData = new List<IArrowArray>();
+
+ // 4. Add all columns data by combining all batches
+ // Create a combined array for each column across all batches
+ for (int colIdx = 0; colIdx < columnsSchema.FieldsList.Count;
colIdx++)
Review Comment:
This, too, could use `ArrayDataConcatenator`.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -587,5 +592,370 @@ protected internal QueryResult
EnhanceGetColumnsResult(Schema originalSchema, IR
return new QueryResult(rowCount, new
HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData));
}
+
+ // Helper method to read all batches from a stream
+ private async Task<(List<RecordBatch> Batches, Schema Schema, int
TotalRows)> ReadAllBatchesAsync(
+ IArrowArrayStream stream, CancellationToken cancellationToken)
+ {
+ List<RecordBatch> batches = new List<RecordBatch>();
+ int totalRows = 0;
+ Schema schema = stream.Schema;
+
+ // Read all batches
+ while (true)
+ {
+ var batch = await
stream.ReadNextRecordBatchAsync(cancellationToken);
+ if (batch == null) break;
+
+ if (batch.Length > 0)
+ {
+ batches.Add(batch);
+ totalRows += batch.Length;
+ }
+ else
+ {
+ batch.Dispose();
+ }
+ }
+
+ return (batches, schema, totalRows);
+ }
+
+ // Helper method to create empty string columns filled with nulls
+ private List<IArrowArray> CreateEmptyStringColumns(string[]
fieldNames, int rowCount)
+ {
+ var result = new List<IArrowArray>();
+
+ foreach (var fieldName in fieldNames)
+ {
+ var builder = new StringArray.Builder();
+ for (int i = 0; i < rowCount; i++)
+ {
+ builder.AppendNull();
+ }
+ result.Add(builder.Build());
+ }
+
+ return result;
+ }
+
+ // Helper class to manage builder creation and value appending
+ // This is only for metadata query result so we only have int or
string types
+ private class TypedBuilder
+ {
+ private readonly IArrowArrayBuilder _builder;
+ private readonly ArrowTypeId _typeId;
+
+ public TypedBuilder(ArrowTypeId typeId)
+ {
+ _typeId = typeId;
+ _builder = CreateBuilder(typeId);
+ }
+
+ private static IArrowArrayBuilder CreateBuilder(ArrowTypeId
typeId) => typeId switch
+ {
+ ArrowTypeId.Int16 => new Int16Array.Builder(),
+ ArrowTypeId.Int32 => new Int32Array.Builder(),
+ _ => new StringArray.Builder() // Default to string for
unsupported types
+ };
+
+ public void AppendNull()
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+ ((Int16Array.Builder)_builder).AppendNull();
+ break;
+ case ArrowTypeId.Int32:
+ ((Int32Array.Builder)_builder).AppendNull();
+ break;
+ default:
+ ((StringArray.Builder)_builder).AppendNull();
+ break;
+ }
+ }
+
+ public IArrowArray Build() => _typeId switch
+ {
+ ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(),
+ ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(),
+ _ => ((StringArray.Builder)_builder).Build()
+ };
+
+ public void AppendValue(IArrowArray columnArray, int rowIndex)
+ {
+ try
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+
((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ case ArrowTypeId.Int32:
+
((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ default: // Handles String and other types that
default to StringArray.Builder
+ // Try to cast to StringArray and get string value
- if it fails,
+ // the outer try-catch will handle it and call
AppendNull()
+
((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex));
+ break;
+ }
+ }
+ catch
+ {
+ // If any conversion fails or columnArray is null, append
null as fallback
+ AppendNull();
+ }
+ }
+
+
Review Comment:
nit: remove blank lines
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -587,5 +592,370 @@ protected internal QueryResult
EnhanceGetColumnsResult(Schema originalSchema, IR
return new QueryResult(rowCount, new
HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData));
}
+
+ // Helper method to read all batches from a stream
+ private async Task<(List<RecordBatch> Batches, Schema Schema, int
TotalRows)> ReadAllBatchesAsync(
+ IArrowArrayStream stream, CancellationToken cancellationToken)
+ {
+ List<RecordBatch> batches = new List<RecordBatch>();
+ int totalRows = 0;
+ Schema schema = stream.Schema;
+
+ // Read all batches
+ while (true)
+ {
+ var batch = await
stream.ReadNextRecordBatchAsync(cancellationToken);
+ if (batch == null) break;
+
+ if (batch.Length > 0)
+ {
+ batches.Add(batch);
+ totalRows += batch.Length;
+ }
+ else
+ {
+ batch.Dispose();
+ }
+ }
+
+ return (batches, schema, totalRows);
+ }
+
+ // Helper method to create empty string columns filled with nulls
+ private List<IArrowArray> CreateEmptyStringColumns(string[]
fieldNames, int rowCount)
+ {
+ var result = new List<IArrowArray>();
+
+ foreach (var fieldName in fieldNames)
+ {
+ var builder = new StringArray.Builder();
+ for (int i = 0; i < rowCount; i++)
+ {
+ builder.AppendNull();
+ }
+ result.Add(builder.Build());
+ }
+
+ return result;
+ }
+
+ // Helper class to manage builder creation and value appending
+ // This is only for metadata query result so we only have int or
string types
+ private class TypedBuilder
+ {
+ private readonly IArrowArrayBuilder _builder;
+ private readonly ArrowTypeId _typeId;
+
+ public TypedBuilder(ArrowTypeId typeId)
+ {
+ _typeId = typeId;
+ _builder = CreateBuilder(typeId);
+ }
+
+ private static IArrowArrayBuilder CreateBuilder(ArrowTypeId
typeId) => typeId switch
+ {
+ ArrowTypeId.Int16 => new Int16Array.Builder(),
+ ArrowTypeId.Int32 => new Int32Array.Builder(),
+ _ => new StringArray.Builder() // Default to string for
unsupported types
+ };
+
+ public void AppendNull()
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+ ((Int16Array.Builder)_builder).AppendNull();
+ break;
+ case ArrowTypeId.Int32:
+ ((Int32Array.Builder)_builder).AppendNull();
+ break;
+ default:
+ ((StringArray.Builder)_builder).AppendNull();
+ break;
+ }
+ }
+
+ public IArrowArray Build() => _typeId switch
+ {
+ ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(),
+ ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(),
+ _ => ((StringArray.Builder)_builder).Build()
+ };
+
+ public void AppendValue(IArrowArray columnArray, int rowIndex)
+ {
+ try
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+
((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ case ArrowTypeId.Int32:
+
((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ default: // Handles String and other types that
default to StringArray.Builder
+ // Try to cast to StringArray and get string value
- if it fails,
+ // the outer try-catch will handle it and call
AppendNull()
+
((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex));
+ break;
+ }
+ }
+ catch
+ {
+ // If any conversion fails or columnArray is null, append
null as fallback
+ AppendNull();
+ }
+ }
+
+
+ }
+
+ private async Task<QueryResult>
GetColumnsExtendedAsync(CancellationToken cancellationToken = default)
+ {
+ // 1. Get all three results at once
+ var columnsResult = await GetColumnsAsync(cancellationToken);
+ if (columnsResult.Stream == null) return columnsResult;
+
+ var pkResult = await GetPrimaryKeysAsync(cancellationToken);
+
+ // For FK lookup, we need to pass in the current
catalog/schema/table as the foreign table
+ var resp = await Connection.GetCrossReferenceAsync(
+ null,
+ null,
+ null,
+ CatalogName,
+ SchemaName,
+ TableName,
+ cancellationToken);
+
+ var fkResult = await GetQueryResult(resp.DirectResults,
cancellationToken);
+
+ // 2. Read all batches into memory
+ List<RecordBatch> columnsBatches;
+ int totalRows;
+ Schema columnsSchema;
+ StringArray? columnNames = null;
+ int colNameIndex = -1;
+
+ // Extract column data
+ using (var stream = columnsResult.Stream)
+ {
+ colNameIndex = stream.Schema.GetFieldIndex("COLUMN_NAME");
+ if (colNameIndex < 0) return columnsResult; // Can't match
without column names
+
+ var batchResult = await ReadAllBatchesAsync(stream,
cancellationToken);
+ columnsBatches = batchResult.Batches;
+ columnsSchema = batchResult.Schema;
+ totalRows = batchResult.TotalRows;
+
+ if (columnsBatches.Count == 0) return columnsResult;
+
+ // Create column names array from all batches
+ var builder = new StringArray.Builder();
Review Comment:
For what it's worth, there's an `ArrayDataConcatenator` in the base Arrow
libraries that might be useful here -- in addition to being considerably more
efficient. It's used in `ListArrayExtensions` in this ADBC codebase.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -587,5 +592,370 @@ protected internal QueryResult
EnhanceGetColumnsResult(Schema originalSchema, IR
return new QueryResult(rowCount, new
HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData));
}
+
+ // Helper method to read all batches from a stream
+ private async Task<(List<RecordBatch> Batches, Schema Schema, int
TotalRows)> ReadAllBatchesAsync(
+ IArrowArrayStream stream, CancellationToken cancellationToken)
+ {
+ List<RecordBatch> batches = new List<RecordBatch>();
+ int totalRows = 0;
+ Schema schema = stream.Schema;
+
Review Comment:
The whitespace linter is still complaining about lines like this. If you're
using the original Visual Studio, you can autoformat with Control-K Control-D.
If you're using VS Code, you can autoformat with Alt-Shift-F. Either one will
remove the trailing spaces. Other editors may have similar commands.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -587,5 +592,370 @@ protected internal QueryResult
EnhanceGetColumnsResult(Schema originalSchema, IR
return new QueryResult(rowCount, new
HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData));
}
+
+ // Helper method to read all batches from a stream
+ private async Task<(List<RecordBatch> Batches, Schema Schema, int
TotalRows)> ReadAllBatchesAsync(
+ IArrowArrayStream stream, CancellationToken cancellationToken)
+ {
+ List<RecordBatch> batches = new List<RecordBatch>();
+ int totalRows = 0;
+ Schema schema = stream.Schema;
+
+ // Read all batches
+ while (true)
+ {
+ var batch = await
stream.ReadNextRecordBatchAsync(cancellationToken);
+ if (batch == null) break;
+
+ if (batch.Length > 0)
+ {
+ batches.Add(batch);
+ totalRows += batch.Length;
+ }
+ else
+ {
+ batch.Dispose();
+ }
+ }
+
+ return (batches, schema, totalRows);
+ }
+
+ // Helper method to create empty string columns filled with nulls
+ private List<IArrowArray> CreateEmptyStringColumns(string[]
fieldNames, int rowCount)
+ {
+ var result = new List<IArrowArray>();
+
+ foreach (var fieldName in fieldNames)
+ {
+ var builder = new StringArray.Builder();
+ for (int i = 0; i < rowCount; i++)
+ {
+ builder.AppendNull();
+ }
+ result.Add(builder.Build());
+ }
+
+ return result;
+ }
+
+ // Helper class to manage builder creation and value appending
+ // This is only for metadata query result so we only have int or
string types
+ private class TypedBuilder
+ {
+ private readonly IArrowArrayBuilder _builder;
+ private readonly ArrowTypeId _typeId;
+
+ public TypedBuilder(ArrowTypeId typeId)
+ {
+ _typeId = typeId;
+ _builder = CreateBuilder(typeId);
+ }
+
+ private static IArrowArrayBuilder CreateBuilder(ArrowTypeId
typeId) => typeId switch
+ {
+ ArrowTypeId.Int16 => new Int16Array.Builder(),
+ ArrowTypeId.Int32 => new Int32Array.Builder(),
+ _ => new StringArray.Builder() // Default to string for
unsupported types
+ };
+
+ public void AppendNull()
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+ ((Int16Array.Builder)_builder).AppendNull();
+ break;
+ case ArrowTypeId.Int32:
+ ((Int32Array.Builder)_builder).AppendNull();
+ break;
+ default:
+ ((StringArray.Builder)_builder).AppendNull();
+ break;
+ }
+ }
+
+ public IArrowArray Build() => _typeId switch
+ {
+ ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(),
+ ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(),
+ _ => ((StringArray.Builder)_builder).Build()
+ };
+
+ public void AppendValue(IArrowArray columnArray, int rowIndex)
+ {
+ try
+ {
+ switch (_typeId)
+ {
+ case ArrowTypeId.Int16:
+
((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ case ArrowTypeId.Int32:
+
((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value);
+ break;
+ default: // Handles String and other types that
default to StringArray.Builder
+ // Try to cast to StringArray and get string value
- if it fails,
+ // the outer try-catch will handle it and call
AppendNull()
+
((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex));
+ break;
+ }
+ }
+ catch
+ {
+ // If any conversion fails or columnArray is null, append
null as fallback
+ AppendNull();
+ }
+ }
+
+
+ }
+
+ private async Task<QueryResult>
GetColumnsExtendedAsync(CancellationToken cancellationToken = default)
+ {
+ // 1. Get all three results at once
+ var columnsResult = await GetColumnsAsync(cancellationToken);
+ if (columnsResult.Stream == null) return columnsResult;
+
+ var pkResult = await GetPrimaryKeysAsync(cancellationToken);
+
+ // For FK lookup, we need to pass in the current
catalog/schema/table as the foreign table
+ var resp = await Connection.GetCrossReferenceAsync(
+ null,
+ null,
+ null,
+ CatalogName,
+ SchemaName,
+ TableName,
+ cancellationToken);
+
+ var fkResult = await GetQueryResult(resp.DirectResults,
cancellationToken);
+
+ // 2. Read all batches into memory
+ List<RecordBatch> columnsBatches;
+ int totalRows;
+ Schema columnsSchema;
+ StringArray? columnNames = null;
+ int colNameIndex = -1;
+
+ // Extract column data
+ using (var stream = columnsResult.Stream)
+ {
+ colNameIndex = stream.Schema.GetFieldIndex("COLUMN_NAME");
+ if (colNameIndex < 0) return columnsResult; // Can't match
without column names
+
+ var batchResult = await ReadAllBatchesAsync(stream,
cancellationToken);
+ columnsBatches = batchResult.Batches;
+ columnsSchema = batchResult.Schema;
+ totalRows = batchResult.TotalRows;
+
+ if (columnsBatches.Count == 0) return columnsResult;
+
+ // Create column names array from all batches
+ var builder = new StringArray.Builder();
+ foreach (var batch in columnsBatches)
+ {
+ StringArray batchColNames =
(StringArray)batch.Column(colNameIndex);
+ for (int i = 0; i < batch.Length; i++)
+ {
+ builder.Append(batchColNames.GetString(i));
+ }
+ }
+ columnNames = builder.Build();
+ }
+
+ // 3. Create combined schema and prepare data
+ var allFields = new List<Field>(columnsSchema.FieldsList);
+ var combinedData = new List<IArrowArray>();
+
+ // 4. Add all columns data by combining all batches
+ // Create a combined array for each column across all batches
+ for (int colIdx = 0; colIdx < columnsSchema.FieldsList.Count;
colIdx++)
+ {
+ if (columnsBatches.Count == 0)
+ continue;
+
+ var field = columnsSchema.GetFieldByIndex(colIdx);
+ var dataType = field.DataType;
+
+ // Create a TypedBuilder that handles both creation and
appending
+ var typedBuilder = new TypedBuilder(dataType.TypeId);
+
+ foreach (var batch in columnsBatches)
+ {
+ var columnArray = batch.Column(colIdx);
+ for (int i = 0; i < columnArray.Length; i++)
+ {
+ typedBuilder.AppendValue(columnArray, i);
+ }
+ }
+
+ combinedData.Add(typedBuilder.Build());
+ }
+
+ // 5. Process PK and FK data using helper methods with selected
fields
+ await ProcessRelationshipData(pkResult, "PK_", "COLUMN_NAME",
+ new[] { "COLUMN_NAME" }, // Selected PK fields
+ columnNames, totalRows,
+ allFields, combinedData, cancellationToken);
+
+ await ProcessRelationshipData(fkResult, "FK_", "FKCOLUMN_NAME",
+ new[] { "PKCOLUMN_NAME", "PKTABLE_CAT", "PKTABLE_SCHEM",
"PKTABLE_NAME", "FKCOLUMN_NAME" }, // Selected FK fields
+ columnNames, totalRows,
+ allFields, combinedData, cancellationToken);
+
+ // 6. Return the combined result
+ var combinedSchema = new Schema(allFields, columnsSchema.Metadata);
+
+ // 7. Clean up batches
+ foreach (var batch in columnsBatches)
+ {
+ batch.Dispose();
+ }
+
+ return new QueryResult(totalRows, new
HiveServer2Connection.HiveInfoArrowStream(combinedSchema, combinedData));
+ }
+
+ // Helper method to process relationship data (PK or FK) with selected
fields
+ private async Task ProcessRelationshipData(QueryResult result, string
prefix, string columnNameField,
+ string[] includeFields, StringArray colNames, int rowCount,
+ List<Field> allFields, List<IArrowArray> combinedData,
CancellationToken cancellationToken)
+ {
+ // First ensure we always add the fields to the schema, even if
there's no data
+ foreach (var fieldName in includeFields)
+ {
+ allFields.Add(new Field(prefix + fieldName,
StringType.Default, true));
+ }
+
+ Schema? streamSchema = null;
+ int relationColIndex = -1;
+ List<RecordBatch> relationBatches = new List<RecordBatch>();
+
+ // Early return with empty columns if there's no valid
relationship data
+ bool hasValidData = result.Stream != null;
+ // Try to read schema and batches if we have a stream
+ if (hasValidData)
+ {
+ using (var stream = result.Stream)
+ {
+ streamSchema = stream!.Schema;
+ if (streamSchema != null)
+ {
+ relationColIndex =
streamSchema.GetFieldIndex(columnNameField);
+
+ // Check if key column exists
+ if (relationColIndex >= 0)
+ {
+ // Read all batches
+ var batchResult = await
ReadAllBatchesAsync(stream, cancellationToken);
+ relationBatches = batchResult.Batches;
+
+ // Update validity based on having batches
+ hasValidData = relationBatches.Count > 0;
+ }
+ else
+ {
+ hasValidData = false;
+ }
+ }
+ else
+ {
+ hasValidData = false;
+ }
+ }
+ }
+
+ // If no valid data, add empty columns and return
+ if (!hasValidData)
+ {
+ var emptyColumns = CreateEmptyStringColumns(includeFields,
rowCount);
+ combinedData.AddRange(emptyColumns);
+ return;
+ }
+
+ // At this point, we know streamSchema is not null because
hasValidData would be false otherwise
+ Dictionary<string, int> fieldIndexMap = new Dictionary<string,
int>(StringComparer.OrdinalIgnoreCase);
+ foreach (var fieldName in includeFields)
+ {
+ int fieldIndex = streamSchema!.GetFieldIndex(fieldName);
+ if (fieldIndex >= 0)
+ {
+ fieldIndexMap[fieldName] = fieldIndex;
+ }
+ }
+
+ // Build column map from relationship data
+ Dictionary<string, (int batchIndex, int rowIndex)> relationMap =
new Dictionary<string, (int, int)>(StringComparer.OrdinalIgnoreCase);
+ for (int batchIndex = 0; batchIndex < relationBatches.Count;
batchIndex++)
+ {
+ var batch = relationBatches[batchIndex];
+ var keyColNames = (StringArray)batch.Column(relationColIndex);
+ for (int i = 0; i < batch.Length; i++)
+ {
+ string? colName = keyColNames.GetString(i);
+ if (!string.IsNullOrEmpty(colName))
+ relationMap[colName] = (batchIndex, i);
+ }
+ }
+
+ // Create builders for all fields
+ Dictionary<int, TypedBuilder> builders = new Dictionary<int,
TypedBuilder>();
+ foreach (var fieldName in includeFields)
+ {
+ // Use StringArray for relationship fields
+ builders[builders.Count] = new
TypedBuilder(ArrowTypeId.String);
+ }
+
+ // Process all batches
+ for (int i = 0; i < rowCount; i++)
+ {
+ string? colName = colNames.GetString(i);
+ if (!string.IsNullOrEmpty(colName) &&
relationMap.TryGetValue(colName, out var relation))
+ {
+ // Process all fields for this match from the correct batch
+ var batch = relationBatches[relation.batchIndex];
+
+ int builderIndex = 0;
+ foreach (var fieldName in includeFields)
+ {
+ var builder = builders[builderIndex++];
+
+ if (!fieldIndexMap.TryGetValue(fieldName, out int
fieldIndex) ||
+ batch.Column(fieldIndex).IsNull(relation.rowIndex))
+ {
+ // Use the specific null-handling code for each
builder type instead of passing null
+ builder.AppendNull();
+ }
+ else
+ {
+ builder.AppendValue(batch.Column(fieldIndex),
relation.rowIndex);
+ }
+ }
+ }
+ else
+ {
+ // Add null values for each field for this unmatched row
+ foreach (var builder in builders.Values)
+ {
+ // Use the specific null-handling code for each
builder type instead of passing null
+ builder.AppendNull();
+ }
+ }
+ }
+
+ // Add built arrays to combined data
+ foreach (var builder in builders.Values)
+ {
+ combinedData.Add(builder.Build());
+ }
+
+ // Clean up batches
+ foreach (var batch in relationBatches)
+ {
+ batch.Dispose();
+ }
+ }
}
}
+
+
Review Comment:
nit: remove trailing line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]