CurtHagenlocher commented on code in PR #2766: URL: https://github.com/apache/arrow-adbc/pull/2766#discussion_r2077512196
########## csharp/test/Drivers/Databricks/StatementTests.cs: ########## @@ -122,6 +122,7 @@ public async Task CanGetCrossReferenceFromChildTableDatabricks() public async Task CanGetColumnsWithBaseTypeName() { var statement = Connection.CreateStatement(); + var res = Connection.GetInfo([AdbcInfoCode.VendorName, AdbcInfoCode.VendorVersion]); Review Comment: Is this just to validate that getting the vendor name and version doesn't raise an exception? ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +592,370 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + // Helper method to read all batches from a stream + private async Task<(List<RecordBatch> Batches, Schema Schema, int TotalRows)> ReadAllBatchesAsync( + IArrowArrayStream stream, CancellationToken cancellationToken) + { + List<RecordBatch> batches = new List<RecordBatch>(); + int totalRows = 0; + Schema schema = stream.Schema; + + // Read all batches + while (true) + { + var batch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (batch == null) break; + + if (batch.Length > 0) + { + batches.Add(batch); + totalRows += batch.Length; + } + else + { + batch.Dispose(); + } + } + + return (batches, schema, totalRows); + } + + // Helper method to create empty string columns filled with nulls + private List<IArrowArray> CreateEmptyStringColumns(string[] fieldNames, int rowCount) + { + var result = new List<IArrowArray>(); + + foreach (var fieldName in fieldNames) + { + var builder = new StringArray.Builder(); + for (int i = 0; i < rowCount; i++) + { + builder.AppendNull(); + } + result.Add(builder.Build()); + } + + return result; + } + + // Helper class to manage builder creation and value appending + // This is only for metadata query result so we only have int or string types + private class TypedBuilder + { + private readonly IArrowArrayBuilder _builder; + private readonly ArrowTypeId _typeId; + + public TypedBuilder(ArrowTypeId typeId) + { + _typeId = typeId; + _builder = CreateBuilder(typeId); + } + + private static IArrowArrayBuilder CreateBuilder(ArrowTypeId typeId) => typeId switch + { + ArrowTypeId.Int16 => new Int16Array.Builder(), + ArrowTypeId.Int32 => new Int32Array.Builder(), + _ => new StringArray.Builder() // Default to string for unsupported types + }; + + public void AppendNull() + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).AppendNull(); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).AppendNull(); + break; + default: + ((StringArray.Builder)_builder).AppendNull(); + break; + } + } + + public IArrowArray Build() => _typeId switch + { + ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(), + ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(), + _ => ((StringArray.Builder)_builder).Build() + }; + + public void AppendValue(IArrowArray columnArray, int rowIndex) + { + try + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value); + break; + default: // Handles String and other types that default to StringArray.Builder + // Try to cast to StringArray and get string value - if it fails, + // the outer try-catch will handle it and call AppendNull() + ((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex)); + break; + } + } + catch + { + // If any conversion fails or columnArray is null, append null as fallback + AppendNull(); + } + } + + + } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + // 2. Read all batches into memory + List<RecordBatch> columnsBatches; + int totalRows; + Schema columnsSchema; + StringArray? columnNames = null; + int colNameIndex = -1; + + // Extract column data + using (var stream = columnsResult.Stream) + { + colNameIndex = stream.Schema.GetFieldIndex("COLUMN_NAME"); + if (colNameIndex < 0) return columnsResult; // Can't match without column names + + var batchResult = await ReadAllBatchesAsync(stream, cancellationToken); + columnsBatches = batchResult.Batches; + columnsSchema = batchResult.Schema; + totalRows = batchResult.TotalRows; + + if (columnsBatches.Count == 0) return columnsResult; + + // Create column names array from all batches + var builder = new StringArray.Builder(); + foreach (var batch in columnsBatches) + { + StringArray batchColNames = (StringArray)batch.Column(colNameIndex); + for (int i = 0; i < batch.Length; i++) + { + builder.Append(batchColNames.GetString(i)); + } + } + columnNames = builder.Build(); + } + + // 3. Create combined schema and prepare data + var allFields = new List<Field>(columnsSchema.FieldsList); + var combinedData = new List<IArrowArray>(); + + // 4. Add all columns data by combining all batches + // Create a combined array for each column across all batches + for (int colIdx = 0; colIdx < columnsSchema.FieldsList.Count; colIdx++) Review Comment: This, too, could use `ArrayDataConcatenator`. ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +592,370 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + // Helper method to read all batches from a stream + private async Task<(List<RecordBatch> Batches, Schema Schema, int TotalRows)> ReadAllBatchesAsync( + IArrowArrayStream stream, CancellationToken cancellationToken) + { + List<RecordBatch> batches = new List<RecordBatch>(); + int totalRows = 0; + Schema schema = stream.Schema; + + // Read all batches + while (true) + { + var batch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (batch == null) break; + + if (batch.Length > 0) + { + batches.Add(batch); + totalRows += batch.Length; + } + else + { + batch.Dispose(); + } + } + + return (batches, schema, totalRows); + } + + // Helper method to create empty string columns filled with nulls + private List<IArrowArray> CreateEmptyStringColumns(string[] fieldNames, int rowCount) + { + var result = new List<IArrowArray>(); + + foreach (var fieldName in fieldNames) + { + var builder = new StringArray.Builder(); + for (int i = 0; i < rowCount; i++) + { + builder.AppendNull(); + } + result.Add(builder.Build()); + } + + return result; + } + + // Helper class to manage builder creation and value appending + // This is only for metadata query result so we only have int or string types + private class TypedBuilder + { + private readonly IArrowArrayBuilder _builder; + private readonly ArrowTypeId _typeId; + + public TypedBuilder(ArrowTypeId typeId) + { + _typeId = typeId; + _builder = CreateBuilder(typeId); + } + + private static IArrowArrayBuilder CreateBuilder(ArrowTypeId typeId) => typeId switch + { + ArrowTypeId.Int16 => new Int16Array.Builder(), + ArrowTypeId.Int32 => new Int32Array.Builder(), + _ => new StringArray.Builder() // Default to string for unsupported types + }; + + public void AppendNull() + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).AppendNull(); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).AppendNull(); + break; + default: + ((StringArray.Builder)_builder).AppendNull(); + break; + } + } + + public IArrowArray Build() => _typeId switch + { + ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(), + ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(), + _ => ((StringArray.Builder)_builder).Build() + }; + + public void AppendValue(IArrowArray columnArray, int rowIndex) + { + try + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value); + break; + default: // Handles String and other types that default to StringArray.Builder + // Try to cast to StringArray and get string value - if it fails, + // the outer try-catch will handle it and call AppendNull() + ((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex)); + break; + } + } + catch + { + // If any conversion fails or columnArray is null, append null as fallback + AppendNull(); + } + } + + Review Comment: nit: remove blank lines ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +592,370 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + // Helper method to read all batches from a stream + private async Task<(List<RecordBatch> Batches, Schema Schema, int TotalRows)> ReadAllBatchesAsync( + IArrowArrayStream stream, CancellationToken cancellationToken) + { + List<RecordBatch> batches = new List<RecordBatch>(); + int totalRows = 0; + Schema schema = stream.Schema; + + // Read all batches + while (true) + { + var batch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (batch == null) break; + + if (batch.Length > 0) + { + batches.Add(batch); + totalRows += batch.Length; + } + else + { + batch.Dispose(); + } + } + + return (batches, schema, totalRows); + } + + // Helper method to create empty string columns filled with nulls + private List<IArrowArray> CreateEmptyStringColumns(string[] fieldNames, int rowCount) + { + var result = new List<IArrowArray>(); + + foreach (var fieldName in fieldNames) + { + var builder = new StringArray.Builder(); + for (int i = 0; i < rowCount; i++) + { + builder.AppendNull(); + } + result.Add(builder.Build()); + } + + return result; + } + + // Helper class to manage builder creation and value appending + // This is only for metadata query result so we only have int or string types + private class TypedBuilder + { + private readonly IArrowArrayBuilder _builder; + private readonly ArrowTypeId _typeId; + + public TypedBuilder(ArrowTypeId typeId) + { + _typeId = typeId; + _builder = CreateBuilder(typeId); + } + + private static IArrowArrayBuilder CreateBuilder(ArrowTypeId typeId) => typeId switch + { + ArrowTypeId.Int16 => new Int16Array.Builder(), + ArrowTypeId.Int32 => new Int32Array.Builder(), + _ => new StringArray.Builder() // Default to string for unsupported types + }; + + public void AppendNull() + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).AppendNull(); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).AppendNull(); + break; + default: + ((StringArray.Builder)_builder).AppendNull(); + break; + } + } + + public IArrowArray Build() => _typeId switch + { + ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(), + ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(), + _ => ((StringArray.Builder)_builder).Build() + }; + + public void AppendValue(IArrowArray columnArray, int rowIndex) + { + try + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value); + break; + default: // Handles String and other types that default to StringArray.Builder + // Try to cast to StringArray and get string value - if it fails, + // the outer try-catch will handle it and call AppendNull() + ((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex)); + break; + } + } + catch + { + // If any conversion fails or columnArray is null, append null as fallback + AppendNull(); + } + } + + + } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + // 2. Read all batches into memory + List<RecordBatch> columnsBatches; + int totalRows; + Schema columnsSchema; + StringArray? columnNames = null; + int colNameIndex = -1; + + // Extract column data + using (var stream = columnsResult.Stream) + { + colNameIndex = stream.Schema.GetFieldIndex("COLUMN_NAME"); + if (colNameIndex < 0) return columnsResult; // Can't match without column names + + var batchResult = await ReadAllBatchesAsync(stream, cancellationToken); + columnsBatches = batchResult.Batches; + columnsSchema = batchResult.Schema; + totalRows = batchResult.TotalRows; + + if (columnsBatches.Count == 0) return columnsResult; + + // Create column names array from all batches + var builder = new StringArray.Builder(); Review Comment: For what it's worth, there's an `ArrayDataConcatenator` in the base Arrow libraries that might be useful here -- in addition to being considerably more efficient. It's used in `ListArrayExtensions` in this ADBC codebase. ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +592,370 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + // Helper method to read all batches from a stream + private async Task<(List<RecordBatch> Batches, Schema Schema, int TotalRows)> ReadAllBatchesAsync( + IArrowArrayStream stream, CancellationToken cancellationToken) + { + List<RecordBatch> batches = new List<RecordBatch>(); + int totalRows = 0; + Schema schema = stream.Schema; + Review Comment: The whitespace linter is still complaining about lines like this. If you're using the original Visual Studio, you can autoformat with Control-K Control-D. If you're using VS Code, you can autoformat with Alt-Shift-F. Either one will remove the trailing spaces. Other editors may have similar commands. ########## csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs: ########## @@ -587,5 +592,370 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR return new QueryResult(rowCount, new HiveServer2Connection.HiveInfoArrowStream(enhancedSchema, enhancedData)); } + + // Helper method to read all batches from a stream + private async Task<(List<RecordBatch> Batches, Schema Schema, int TotalRows)> ReadAllBatchesAsync( + IArrowArrayStream stream, CancellationToken cancellationToken) + { + List<RecordBatch> batches = new List<RecordBatch>(); + int totalRows = 0; + Schema schema = stream.Schema; + + // Read all batches + while (true) + { + var batch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (batch == null) break; + + if (batch.Length > 0) + { + batches.Add(batch); + totalRows += batch.Length; + } + else + { + batch.Dispose(); + } + } + + return (batches, schema, totalRows); + } + + // Helper method to create empty string columns filled with nulls + private List<IArrowArray> CreateEmptyStringColumns(string[] fieldNames, int rowCount) + { + var result = new List<IArrowArray>(); + + foreach (var fieldName in fieldNames) + { + var builder = new StringArray.Builder(); + for (int i = 0; i < rowCount; i++) + { + builder.AppendNull(); + } + result.Add(builder.Build()); + } + + return result; + } + + // Helper class to manage builder creation and value appending + // This is only for metadata query result so we only have int or string types + private class TypedBuilder + { + private readonly IArrowArrayBuilder _builder; + private readonly ArrowTypeId _typeId; + + public TypedBuilder(ArrowTypeId typeId) + { + _typeId = typeId; + _builder = CreateBuilder(typeId); + } + + private static IArrowArrayBuilder CreateBuilder(ArrowTypeId typeId) => typeId switch + { + ArrowTypeId.Int16 => new Int16Array.Builder(), + ArrowTypeId.Int32 => new Int32Array.Builder(), + _ => new StringArray.Builder() // Default to string for unsupported types + }; + + public void AppendNull() + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).AppendNull(); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).AppendNull(); + break; + default: + ((StringArray.Builder)_builder).AppendNull(); + break; + } + } + + public IArrowArray Build() => _typeId switch + { + ArrowTypeId.Int16 => ((Int16Array.Builder)_builder).Build(), + ArrowTypeId.Int32 => ((Int32Array.Builder)_builder).Build(), + _ => ((StringArray.Builder)_builder).Build() + }; + + public void AppendValue(IArrowArray columnArray, int rowIndex) + { + try + { + switch (_typeId) + { + case ArrowTypeId.Int16: + ((Int16Array.Builder)_builder).Append(((Int16Array)columnArray).GetValue(rowIndex)!.Value); + break; + case ArrowTypeId.Int32: + ((Int32Array.Builder)_builder).Append(((Int32Array)columnArray).GetValue(rowIndex)!.Value); + break; + default: // Handles String and other types that default to StringArray.Builder + // Try to cast to StringArray and get string value - if it fails, + // the outer try-catch will handle it and call AppendNull() + ((StringArray.Builder)_builder).Append(((StringArray)columnArray).GetString(rowIndex)); + break; + } + } + catch + { + // If any conversion fails or columnArray is null, append null as fallback + AppendNull(); + } + } + + + } + + private async Task<QueryResult> GetColumnsExtendedAsync(CancellationToken cancellationToken = default) + { + // 1. Get all three results at once + var columnsResult = await GetColumnsAsync(cancellationToken); + if (columnsResult.Stream == null) return columnsResult; + + var pkResult = await GetPrimaryKeysAsync(cancellationToken); + + // For FK lookup, we need to pass in the current catalog/schema/table as the foreign table + var resp = await Connection.GetCrossReferenceAsync( + null, + null, + null, + CatalogName, + SchemaName, + TableName, + cancellationToken); + + var fkResult = await GetQueryResult(resp.DirectResults, cancellationToken); + + // 2. Read all batches into memory + List<RecordBatch> columnsBatches; + int totalRows; + Schema columnsSchema; + StringArray? columnNames = null; + int colNameIndex = -1; + + // Extract column data + using (var stream = columnsResult.Stream) + { + colNameIndex = stream.Schema.GetFieldIndex("COLUMN_NAME"); + if (colNameIndex < 0) return columnsResult; // Can't match without column names + + var batchResult = await ReadAllBatchesAsync(stream, cancellationToken); + columnsBatches = batchResult.Batches; + columnsSchema = batchResult.Schema; + totalRows = batchResult.TotalRows; + + if (columnsBatches.Count == 0) return columnsResult; + + // Create column names array from all batches + var builder = new StringArray.Builder(); + foreach (var batch in columnsBatches) + { + StringArray batchColNames = (StringArray)batch.Column(colNameIndex); + for (int i = 0; i < batch.Length; i++) + { + builder.Append(batchColNames.GetString(i)); + } + } + columnNames = builder.Build(); + } + + // 3. Create combined schema and prepare data + var allFields = new List<Field>(columnsSchema.FieldsList); + var combinedData = new List<IArrowArray>(); + + // 4. Add all columns data by combining all batches + // Create a combined array for each column across all batches + for (int colIdx = 0; colIdx < columnsSchema.FieldsList.Count; colIdx++) + { + if (columnsBatches.Count == 0) + continue; + + var field = columnsSchema.GetFieldByIndex(colIdx); + var dataType = field.DataType; + + // Create a TypedBuilder that handles both creation and appending + var typedBuilder = new TypedBuilder(dataType.TypeId); + + foreach (var batch in columnsBatches) + { + var columnArray = batch.Column(colIdx); + for (int i = 0; i < columnArray.Length; i++) + { + typedBuilder.AppendValue(columnArray, i); + } + } + + combinedData.Add(typedBuilder.Build()); + } + + // 5. Process PK and FK data using helper methods with selected fields + await ProcessRelationshipData(pkResult, "PK_", "COLUMN_NAME", + new[] { "COLUMN_NAME" }, // Selected PK fields + columnNames, totalRows, + allFields, combinedData, cancellationToken); + + await ProcessRelationshipData(fkResult, "FK_", "FKCOLUMN_NAME", + new[] { "PKCOLUMN_NAME", "PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "FKCOLUMN_NAME" }, // Selected FK fields + columnNames, totalRows, + allFields, combinedData, cancellationToken); + + // 6. Return the combined result + var combinedSchema = new Schema(allFields, columnsSchema.Metadata); + + // 7. Clean up batches + foreach (var batch in columnsBatches) + { + batch.Dispose(); + } + + return new QueryResult(totalRows, new HiveServer2Connection.HiveInfoArrowStream(combinedSchema, combinedData)); + } + + // Helper method to process relationship data (PK or FK) with selected fields + private async Task ProcessRelationshipData(QueryResult result, string prefix, string columnNameField, + string[] includeFields, StringArray colNames, int rowCount, + List<Field> allFields, List<IArrowArray> combinedData, CancellationToken cancellationToken) + { + // First ensure we always add the fields to the schema, even if there's no data + foreach (var fieldName in includeFields) + { + allFields.Add(new Field(prefix + fieldName, StringType.Default, true)); + } + + Schema? streamSchema = null; + int relationColIndex = -1; + List<RecordBatch> relationBatches = new List<RecordBatch>(); + + // Early return with empty columns if there's no valid relationship data + bool hasValidData = result.Stream != null; + // Try to read schema and batches if we have a stream + if (hasValidData) + { + using (var stream = result.Stream) + { + streamSchema = stream!.Schema; + if (streamSchema != null) + { + relationColIndex = streamSchema.GetFieldIndex(columnNameField); + + // Check if key column exists + if (relationColIndex >= 0) + { + // Read all batches + var batchResult = await ReadAllBatchesAsync(stream, cancellationToken); + relationBatches = batchResult.Batches; + + // Update validity based on having batches + hasValidData = relationBatches.Count > 0; + } + else + { + hasValidData = false; + } + } + else + { + hasValidData = false; + } + } + } + + // If no valid data, add empty columns and return + if (!hasValidData) + { + var emptyColumns = CreateEmptyStringColumns(includeFields, rowCount); + combinedData.AddRange(emptyColumns); + return; + } + + // At this point, we know streamSchema is not null because hasValidData would be false otherwise + Dictionary<string, int> fieldIndexMap = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase); + foreach (var fieldName in includeFields) + { + int fieldIndex = streamSchema!.GetFieldIndex(fieldName); + if (fieldIndex >= 0) + { + fieldIndexMap[fieldName] = fieldIndex; + } + } + + // Build column map from relationship data + Dictionary<string, (int batchIndex, int rowIndex)> relationMap = new Dictionary<string, (int, int)>(StringComparer.OrdinalIgnoreCase); + for (int batchIndex = 0; batchIndex < relationBatches.Count; batchIndex++) + { + var batch = relationBatches[batchIndex]; + var keyColNames = (StringArray)batch.Column(relationColIndex); + for (int i = 0; i < batch.Length; i++) + { + string? colName = keyColNames.GetString(i); + if (!string.IsNullOrEmpty(colName)) + relationMap[colName] = (batchIndex, i); + } + } + + // Create builders for all fields + Dictionary<int, TypedBuilder> builders = new Dictionary<int, TypedBuilder>(); + foreach (var fieldName in includeFields) + { + // Use StringArray for relationship fields + builders[builders.Count] = new TypedBuilder(ArrowTypeId.String); + } + + // Process all batches + for (int i = 0; i < rowCount; i++) + { + string? colName = colNames.GetString(i); + if (!string.IsNullOrEmpty(colName) && relationMap.TryGetValue(colName, out var relation)) + { + // Process all fields for this match from the correct batch + var batch = relationBatches[relation.batchIndex]; + + int builderIndex = 0; + foreach (var fieldName in includeFields) + { + var builder = builders[builderIndex++]; + + if (!fieldIndexMap.TryGetValue(fieldName, out int fieldIndex) || + batch.Column(fieldIndex).IsNull(relation.rowIndex)) + { + // Use the specific null-handling code for each builder type instead of passing null + builder.AppendNull(); + } + else + { + builder.AppendValue(batch.Column(fieldIndex), relation.rowIndex); + } + } + } + else + { + // Add null values for each field for this unmatched row + foreach (var builder in builders.Values) + { + // Use the specific null-handling code for each builder type instead of passing null + builder.AppendNull(); + } + } + } + + // Add built arrays to combined data + foreach (var builder in builders.Values) + { + combinedData.Add(builder.Build()); + } + + // Clean up batches + foreach (var batch in relationBatches) + { + batch.Dispose(); + } + } } } + + Review Comment: nit: remove trailing line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org