jembishop opened a new issue, #1868:
URL: https://github.com/apache/iceberg-rust/issues/1868

   ### Apache Iceberg Rust version
   
   None
   
   ### Describe the bug
   
   It seems if I use fast append for a iceberg table using the glue catalog, 
the optimistic concurrency doesn't seem to work, and some of my processes 
registered files get clobbered by other concurrent processing. 
   
   My setup is a iceberg table with the glue catalog in AWS. I create parquet 
files (not using rust-iceberg), then use rust iceberg to register the uploaded 
files with iceberg, retrying to ensure optimistic concurrency errors are 
retried.  
   
   
   ```rust
   async fn register_files_to_iceberg_batch(
       bucket_name: &str, region: &str, event_name: &str, is_test: bool, 
requests: Vec<IcebergRegistrationRequest>,
       credentials: &aws_credential_types::Credentials,
   ) -> anyhow::Result<()> {
       const RETRY_DURATION: std::time::Duration = 
std::time::Duration::from_secs(10);
       const MAX_ATTEMPTS: u32 = 100;
   
       for attempt in 0..MAX_ATTEMPTS {
           let catalog = create_glue_catalog(bucket_name, credentials)
               .await
               .with_context(|| format!("Failed to create Glue catalog on 
attempt {}", attempt))?;
   
           match register_files_to_iceberg_batch_once(bucket_name, region, 
event_name, is_test, &requests, &catalog).await
           {
               Ok(()) => return Ok(()),
               Err(err) => {
                   let is_commit_conflict = err
                       .downcast_ref::<iceberg::Error>()
                       .map(|e| e.kind() == ErrorKind::CatalogCommitConflicts)
                       .unwrap_or(false);
   
                   if attempt < MAX_ATTEMPTS - 1 {
                       if is_commit_conflict {
                           tracing::info!(
                               attempt,
                               error = &*err,
                               retry_duration = ?RETRY_DURATION,
                               "Iceberg commit conflict, retrying batch with 
fresh catalog"
                           );
                       } else {
                           tracing::error!(
                               attempt,
                               error = &*err,
                               retry_duration = ?RETRY_DURATION,
                               "Error registering files to Iceberg, retrying 
batch with fresh catalog"
                           );
                       }
                       tokio::time::sleep(RETRY_DURATION).await;
                   } else {
                       bail!("Max retry attempts reached for Iceberg batch 
registration: {}", err);
                   }
               }
           }
       }
       Ok(())
   }
   async fn register_files_to_iceberg_batch_once(
       bucket_name: &str, region: &str, event_name: &str, is_test: bool, 
requests: &[IcebergRegistrationRequest],
       catalog: &iceberg_catalog_glue::GlueCatalog,
   ) -> anyhow::Result<()> {
       let start = std::time::Instant::now();
       let region_name = region_to_name(region);
       let table_name = format!("{}_{}_iceberg", region_name, event_name);
       let database_name = if is_test { "test_market_data" } else { 
"market_data" };
   
       tracing::info!(
           table_name,
           file_count = requests.len(),
           "Registering files to Iceberg table"
       );
   
       let table_ident = 
TableIdent::new(NamespaceIdent::new(database_name.to_string()), 
table_name.clone());
       let table = catalog.load_table(&table_ident).await?;
   
       let partition_spec = table.metadata().default_partition_spec();
   
       // Build data files for all requests
       let mut data_files = Vec::new();
       for request in requests {
           let full_path = format!(
               "s3://{}/{}",
               bucket_name,
               request.s3_key.to_string_with_id(&request.file_id)
           );
   
           let mut partition_values = Vec::new();
           for field in partition_spec.fields() {
               let source_field = table
                   .metadata()
                   .current_schema()
                   .field_by_id(field.source_id)
                   .context("Failed to get source field")?;
               let field_name = source_field.name.as_str();
   
               let literal = match field_name {
                   "InstrumentKind" => 
Some(Literal::string(request.s3_key.partition_info.instrument_kind.clone())),
                   "Exchange" => 
Some(Literal::string(request.s3_key.partition_info.exchange.clone())),
                   "Date" => {
                       let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
                       let days_since_epoch = request
                           .s3_key
                           .partition_info
                           .date
                           .signed_duration_since(epoch)
                           .num_days() as i32;
                       Some(Literal::date(days_since_epoch))
                   }
                   _ => None,
               };
               partition_values.push(literal);
           }
   
           let partition = Struct::from_iter(partition_values);
   
           let data_file = DataFileBuilder::default()
               .content(DataContentType::Data)
               .file_path(full_path.clone())
               .file_format(DataFileFormat::Parquet)
               .partition(partition)
               .record_count(request.metadata.record_count)
               .file_size_in_bytes(request.metadata.file_size)
               .partition_spec_id(table.metadata().default_partition_spec_id())
               .column_sizes(request.metadata.stats.column_sizes.clone())
               .value_counts(request.metadata.stats.value_counts.clone())
               
.null_value_counts(request.metadata.stats.null_value_counts.clone())
               .lower_bounds(request.metadata.stats.lower_bounds.clone())
               .upper_bounds(request.metadata.stats.upper_bounds.clone())
               .build()?;
   
           data_files.push(data_file);
       }
   
       // Single commit with all data files
       let tx = Transaction::new(&table);
       let append_action = 
tx.fast_append().with_check_duplicate(false).add_data_files(data_files);
       let tx = append_action.apply(tx)?;
       tx.commit(catalog).await?;
   
       let committed_files: Vec<String> = requests
           .iter()
           .map(|r| r.s3_key.to_string_with_id(&r.file_id))
           .collect();
   
       tracing::info!(
           table = table_name,
           file_count = requests.len(),
           elapsed_ms = start.elapsed().as_millis(),
           files = ?committed_files,
           "Successfully registered files to Iceberg in single commit"
       );
   
       Ok(())
   }
   
   ```
   
   When running this, I don't actually observe any logs related to iceberg 
commit conflicts, and I get a bunch of orphaned unregistered files with 
concurrent writers, which I thought was due to optimistic concurrency conflicts 
not being raised.
   
   I have a branch 
[here](https://github.com/apache/iceberg-rust/compare/main...jembishop:iceberg-rust:version_fix)
 which sets the glue table version id, which seems to solve the problem for me. 
   
   Is this a bug or am I misusing the library? 
   
   Thanks
   
   ### To Reproduce
   
   Set up an iceberg table in s3 with aws glue, and use multiple writers 
attempting to register parquet files using the fast append action at the same 
time, with optimistic retrying for concurrency.
   
   ### Expected behavior
   
   There will be large amounts of orphans due to concurrent modification errors 
not occuring.
   
   ### Willingness to contribute
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to