This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/main by this push:
     new b5f8731b94 optional DDL execution in table output 2781 (#5902)
b5f8731b94 is described below

commit b5f8731b94505cdd275f0b48c156ce53c683a92f
Author: Bart Maertens <[email protected]>
AuthorDate: Sun Oct 26 17:26:07 2025 +0100

    optional DDL execution in table output 2781 (#5902)
    
    * initial refactoring to auto-execute DDL changes. #2781
    
    * create non-existing tables. #2781
    
    * new DDL options for Table Output. #2781
    
    * final changes for DDL in table output. fixes #2781
    
    * apache rat updates
---
 .../pages/pipeline/transforms/tableoutput.adoc     |  13 +
 .../database/0033-table-output-add-columns.hpl     | 165 +++++++
 .../database/0033-table-output-auto-create.hpl     | 146 ++++++
 .../database/0033-table-output-change-types.hpl    | 153 +++++++
 .../database/0033-table-output-drop-columns.hpl    | 141 ++++++
 .../database/0033-table-output-drop-recreate.hpl   | 152 +++++++
 .../database/main-0033-table-output-auto-ddl.hwf   | 296 +++++++++++++
 .../main-0034-table-output-add-columns.hwf         | 260 +++++++++++
 .../main-0035-table-output-drop-columns.hwf        | 267 +++++++++++
 .../main-0036-table-output-change-types.hwf        | 261 +++++++++++
 .../transforms/tableoutput/TableOutput.java        | 487 +++++++++++++++++++++
 .../transforms/tableoutput/TableOutputDialog.java  | 184 +++++++-
 .../transforms/tableoutput/TableOutputMeta.java    |  35 ++
 .../tableoutput/messages/messages_en_US.properties |  12 +
 .../tableoutput/TableOutputMetaTest.java           |  87 ++++
 .../transforms/tableoutput/TableOutputTest.java    | 120 +++++
 16 files changed, 2778 insertions(+), 1 deletion(-)

diff --git 
a/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/tableoutput.adoc 
b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/tableoutput.adoc
index 3b19f457e6..34d840a05a 100644
--- 
a/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/tableoutput.adoc
+++ 
b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/tableoutput.adoc
@@ -64,6 +64,19 @@ A maximum of 20 warnings will be logged however.
 This option is not available for batch inserts.
 |Specify database fields|Enable this option to specify the fields in the 
Database fields tab.
 Otherwise all fields are taken into account by default.
+|Automatically update table structure|Automatically manages table structure 
based on the incoming data stream.
+When enabled, the table is created if it doesn't exist, and additional options 
become available for column management.
+This option is incompatible with "Specify database fields".
+|Always drop and recreate table|Drops and recreates the table on every 
execution.
+Only available when "Automatically update table structure" is enabled.
+|Add columns|Adds columns to the table that exist in the incoming stream but 
not in the table.
+Only available when "Automatically update table structure" is enabled.
+|Drop non-existing columns|Drops columns from the table that don't exist in 
the incoming stream.
+Warning: This will result in data loss for the dropped columns.
+Only available when "Automatically update table structure" is enabled.
+|Change column data types|Changes column data types to match the incoming 
stream.
+Warning: Columns are dropped and recreated, resulting in data loss.
+Only available when "Automatically update table structure" is enabled.
 |Partition data over tables a|Use to split the data over multiple tables.
 For example instead of inserting all data into table SALES, put the data into 
tables SALES_200510, SALES_200511, SALES_200512, ... Use this on systems that 
don't have partitioned tables and/or don't allow inserts into UNION ALL views 
or the master of inherited tables.
 The view SALES allows you to report on the complete sales:
diff --git a/integration-tests/database/0033-table-output-add-columns.hpl 
b/integration-tests/database/0033-table-output-add-columns.hpl
new file mode 100644
index 0000000000..3fa32de37c
--- /dev/null
+++ b/integration-tests/database/0033-table-output-add-columns.hpl
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+  <info>
+    <name>0033-table-output-add-columns</name>
+    <name_sync_with_filename>Y</name_sync_with_filename>
+    <description>Test add missing columns functionality</description>
+    <extended_description/>
+    <pipeline_version/>
+    <pipeline_type>Normal</pipeline_type>
+    <parameters>
+    </parameters>
+    <capture_transform_performance>N</capture_transform_performance>
+    
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+    
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+    <created_user>-</created_user>
+    <created_date>2025/10/25 10:30:00.000</created_date>
+    <modified_user>-</modified_user>
+    <modified_date>2025/10/25 10:30:00.000</modified_date>
+    <key_for_session_key/>
+    <is_key_private>N</is_key_private>
+  </info>
+  <notepads>
+  </notepads>
+  <order>
+    <hop>
+      <from>Generate data with 4 columns</from>
+      <to>Add columns table output</to>
+      <enabled>Y</enabled>
+    </hop>
+  </order>
+  <transform>
+    <name>Generate data with 4 columns</name>
+    <type>RowGenerator</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>id</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Integer</type>
+        <nullif>1</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>name</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>String</type>
+        <nullif>Original</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>new_column_1</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>String</type>
+        <nullif>Added1</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>new_column_2</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Number</type>
+        <nullif>999.99</nullif>
+      </field>
+    </fields>
+    <interval_in_ms>5000</interval_in_ms>
+    <last_time_field>FiveSecondsAgo</last_time_field>
+    <never_ending>N</never_ending>
+    <limit>50</limit>
+    <row_time_field>now</row_time_field>
+    <attributes/>
+    <GUI>
+      <xloc>176</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>Add columns table output</name>
+    <type>TableOutput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <commit>1000</commit>
+    <connection>unit-test-db</connection>
+    <fields>
+    </fields>
+    <return_field/>
+    <ignore_errors>N</ignore_errors>
+    <only_when_have_rows>N</only_when_have_rows>
+    <partitioning_daily>N</partitioning_daily>
+    <partitioning_enabled>N</partitioning_enabled>
+    <partitioning_field/>
+    <partitioning_monthly>Y</partitioning_monthly>
+    <return_keys>N</return_keys>
+    <schema>public</schema>
+    <specify_fields>N</specify_fields>
+    <auto_update_table_structure>Y</auto_update_table_structure>
+    <always_drop_and_recreate>N</always_drop_and_recreate>
+    <add_columns>Y</add_columns>
+    <table>table_add_columns_test</table>
+    <tablename_field/>
+    <tablename_in_field>N</tablename_in_field>
+    <tablename_in_table>Y</tablename_in_table>
+    <truncate>N</truncate>
+    <use_batch>Y</use_batch>
+    <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform_error_handling>
+  </transform_error_handling>
+  <attributes/>
+</pipeline>
+
diff --git a/integration-tests/database/0033-table-output-auto-create.hpl 
b/integration-tests/database/0033-table-output-auto-create.hpl
new file mode 100644
index 0000000000..beb1a6fee2
--- /dev/null
+++ b/integration-tests/database/0033-table-output-auto-create.hpl
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+  <info>
+    <name>0033-table-output-auto-create</name>
+    <name_sync_with_filename>Y</name_sync_with_filename>
+    <description>Test auto-create table functionality</description>
+    <extended_description/>
+    <pipeline_version/>
+    <pipeline_type>Normal</pipeline_type>
+    <parameters>
+    </parameters>
+    <capture_transform_performance>N</capture_transform_performance>
+    
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+    
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+    <created_user>-</created_user>
+    <created_date>2025/10/04 15:00:00.000</created_date>
+    <modified_user>-</modified_user>
+    <modified_date>2025/10/04 15:00:00.000</modified_date>
+  </info>
+  <notepads>
+  </notepads>
+  <order>
+    <hop>
+      <from>Generate test data</from>
+      <to>Auto-create table output</to>
+      <enabled>Y</enabled>
+    </hop>
+  </order>
+  <transform>
+    <name>Auto-create table output</name>
+    <type>TableOutput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <always_drop_and_recreate>N</always_drop_and_recreate>
+    <auto_update_table_structure>Y</auto_update_table_structure>
+    <commit>1000</commit>
+    <connection>unit-test-db</connection>
+    <fields>
+</fields>
+    <ignore_errors>N</ignore_errors>
+    <only_when_have_rows>N</only_when_have_rows>
+    <partitioning_daily>N</partitioning_daily>
+    <partitioning_enabled>N</partitioning_enabled>
+    <partitioning_monthly>Y</partitioning_monthly>
+    <return_keys>N</return_keys>
+    <schema>public</schema>
+    <specify_fields>N</specify_fields>
+    <table>table_auto_create_test</table>
+    <tablename_in_field>N</tablename_in_field>
+    <tablename_in_table>Y</tablename_in_table>
+    <truncate>N</truncate>
+    <use_batch>Y</use_batch>
+    <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>Generate test data</name>
+    <type>RowGenerator</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>id</name>
+        <nullif>1</nullif>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Integer</type>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>15</length>
+        <name>name</name>
+        <nullif>Test</nullif>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>String</type>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>value</name>
+        <nullif>123.45</nullif>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Number</type>
+      </field>
+    </fields>
+    <interval_in_ms>5000</interval_in_ms>
+    <last_time_field>FiveSecondsAgo</last_time_field>
+    <limit>100</limit>
+    <never_ending>N</never_ending>
+    <row_time_field>now</row_time_field>
+    <attributes/>
+    <GUI>
+      <xloc>176</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform_error_handling>
+  </transform_error_handling>
+  <attributes/>
+</pipeline>
diff --git a/integration-tests/database/0033-table-output-change-types.hpl 
b/integration-tests/database/0033-table-output-change-types.hpl
new file mode 100644
index 0000000000..5ceef78d1b
--- /dev/null
+++ b/integration-tests/database/0033-table-output-change-types.hpl
@@ -0,0 +1,153 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+  <info>
+    <name>0033-table-output-change-types</name>
+    <name_sync_with_filename>Y</name_sync_with_filename>
+    <description>Test change column data types functionality</description>
+    <extended_description/>
+    <pipeline_version/>
+    <pipeline_type>Normal</pipeline_type>
+    <parameters>
+    </parameters>
+    <capture_transform_performance>N</capture_transform_performance>
+    
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+    
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+    <created_user>-</created_user>
+    <created_date>2025/10/25 11:30:00.000</created_date>
+    <modified_user>-</modified_user>
+    <modified_date>2025/10/25 11:30:00.000</modified_date>
+    <key_for_session_key/>
+    <is_key_private>N</is_key_private>
+  </info>
+  <notepads>
+  </notepads>
+  <order>
+    <hop>
+      <from>Generate data with new types</from>
+      <to>Change types table output</to>
+      <enabled>Y</enabled>
+    </hop>
+  </order>
+  <transform>
+    <name>Generate data with new types</name>
+    <type>RowGenerator</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>id</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Integer</type>
+        <nullif>1</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>age</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Integer</type>
+        <nullif>25</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>name</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>String</type>
+        <nullif>TypeChanged</nullif>
+      </field>
+    </fields>
+    <interval_in_ms>5000</interval_in_ms>
+    <last_time_field>FiveSecondsAgo</last_time_field>
+    <never_ending>N</never_ending>
+    <limit>50</limit>
+    <row_time_field>now</row_time_field>
+    <attributes/>
+    <GUI>
+      <xloc>176</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>Change types table output</name>
+    <type>TableOutput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <commit>1000</commit>
+    <connection>unit-test-db</connection>
+    <fields>
+    </fields>
+    <return_field/>
+    <ignore_errors>N</ignore_errors>
+    <only_when_have_rows>N</only_when_have_rows>
+    <partitioning_daily>N</partitioning_daily>
+    <partitioning_enabled>N</partitioning_enabled>
+    <partitioning_field/>
+    <partitioning_monthly>Y</partitioning_monthly>
+    <return_keys>N</return_keys>
+    <schema>public</schema>
+    <specify_fields>N</specify_fields>
+    <auto_update_table_structure>Y</auto_update_table_structure>
+    <always_drop_and_recreate>N</always_drop_and_recreate>
+    <change_column_types>Y</change_column_types>
+    <table>table_change_types_test</table>
+    <tablename_field/>
+    <tablename_in_field>N</tablename_in_field>
+    <tablename_in_table>Y</tablename_in_table>
+    <truncate>N</truncate>
+    <use_batch>Y</use_batch>
+    <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform_error_handling>
+  </transform_error_handling>
+  <attributes/>
+</pipeline>
+
diff --git a/integration-tests/database/0033-table-output-drop-columns.hpl 
b/integration-tests/database/0033-table-output-drop-columns.hpl
new file mode 100644
index 0000000000..a9d6b542ef
--- /dev/null
+++ b/integration-tests/database/0033-table-output-drop-columns.hpl
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+  <info>
+    <name>0033-table-output-drop-columns</name>
+    <name_sync_with_filename>Y</name_sync_with_filename>
+    <description>Test drop surplus columns functionality</description>
+    <extended_description/>
+    <pipeline_version/>
+    <pipeline_type>Normal</pipeline_type>
+    <parameters>
+    </parameters>
+    <capture_transform_performance>N</capture_transform_performance>
+    
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+    
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+    <created_user>-</created_user>
+    <created_date>2025/10/25 11:00:00.000</created_date>
+    <modified_user>-</modified_user>
+    <modified_date>2025/10/25 11:00:00.000</modified_date>
+    <key_for_session_key/>
+    <is_key_private>N</is_key_private>
+  </info>
+  <notepads>
+  </notepads>
+  <order>
+    <hop>
+      <from>Generate data with 2 columns</from>
+      <to>Drop columns table output</to>
+      <enabled>Y</enabled>
+    </hop>
+  </order>
+  <transform>
+    <name>Generate data with 2 columns</name>
+    <type>RowGenerator</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>id</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Integer</type>
+        <nullif>1</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>name</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>String</type>
+        <nullif>Reduced</nullif>
+      </field>
+    </fields>
+    <interval_in_ms>5000</interval_in_ms>
+    <last_time_field>FiveSecondsAgo</last_time_field>
+    <never_ending>N</never_ending>
+    <limit>50</limit>
+    <row_time_field>now</row_time_field>
+    <attributes/>
+    <GUI>
+      <xloc>176</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>Drop columns table output</name>
+    <type>TableOutput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <commit>1000</commit>
+    <connection>unit-test-db</connection>
+    <fields>
+    </fields>
+    <return_field/>
+    <ignore_errors>N</ignore_errors>
+    <only_when_have_rows>N</only_when_have_rows>
+    <partitioning_daily>N</partitioning_daily>
+    <partitioning_enabled>N</partitioning_enabled>
+    <partitioning_field/>
+    <partitioning_monthly>Y</partitioning_monthly>
+    <return_keys>N</return_keys>
+    <schema>public</schema>
+    <specify_fields>N</specify_fields>
+    <auto_update_table_structure>Y</auto_update_table_structure>
+    <always_drop_and_recreate>N</always_drop_and_recreate>
+    <drop_columns>Y</drop_columns>
+    <table>table_drop_columns_test</table>
+    <tablename_field/>
+    <tablename_in_field>N</tablename_in_field>
+    <tablename_in_table>Y</tablename_in_table>
+    <truncate>N</truncate>
+    <use_batch>Y</use_batch>
+    <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform_error_handling>
+  </transform_error_handling>
+  <attributes/>
+</pipeline>
+
diff --git a/integration-tests/database/0033-table-output-drop-recreate.hpl 
b/integration-tests/database/0033-table-output-drop-recreate.hpl
new file mode 100644
index 0000000000..4b0f3433fe
--- /dev/null
+++ b/integration-tests/database/0033-table-output-drop-recreate.hpl
@@ -0,0 +1,152 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+  <info>
+    <name>0033-table-output-drop-recreate</name>
+    <name_sync_with_filename>Y</name_sync_with_filename>
+    <description>Test drop and recreate table functionality</description>
+    <extended_description/>
+    <pipeline_version/>
+    <pipeline_type>Normal</pipeline_type>
+    <parameters>
+    </parameters>
+    <capture_transform_performance>N</capture_transform_performance>
+    
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+    
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+    <created_user>-</created_user>
+    <created_date>2025/10/04 15:00:00.000</created_date>
+    <modified_user>-</modified_user>
+    <modified_date>2025/10/04 15:00:00.000</modified_date>
+    <key_for_session_key/>
+    <is_key_private>N</is_key_private>
+  </info>
+  <notepads>
+  </notepads>
+  <order>
+    <hop>
+      <from>Generate test data</from>
+      <to>Drop and recreate table output</to>
+      <enabled>Y</enabled>
+    </hop>
+  </order>
+  <transform>
+    <name>Generate test data</name>
+    <type>RowGenerator</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>id</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Integer</type>
+        <nullif>1</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>name</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>String</type>
+        <nullif>RecreateTest</nullif>
+      </field>
+      <field>
+        <currency/>
+        <decimal/>
+        <format/>
+        <group/>
+        <length>-1</length>
+        <name>amount</name>
+        <precision>-1</precision>
+        <set_empty_string>N</set_empty_string>
+        <type>Number</type>
+        <nullif>999.99</nullif>
+      </field>
+    </fields>
+    <interval_in_ms>5000</interval_in_ms>
+    <last_time_field>FiveSecondsAgo</last_time_field>
+    <never_ending>N</never_ending>
+    <limit>50</limit>
+    <row_time_field>now</row_time_field>
+    <attributes/>
+    <GUI>
+      <xloc>176</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>Drop and recreate table output</name>
+    <type>TableOutput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <commit>1000</commit>
+    <connection>unit-test-db</connection>
+    <fields>
+    </fields>
+    <return_field/>
+    <ignore_errors>N</ignore_errors>
+    <only_when_have_rows>N</only_when_have_rows>
+    <partitioning_daily>N</partitioning_daily>
+    <partitioning_enabled>N</partitioning_enabled>
+    <partitioning_field/>
+    <partitioning_monthly>Y</partitioning_monthly>
+    <return_keys>N</return_keys>
+    <schema>public</schema>
+    <specify_fields>N</specify_fields>
+    <auto_update_table_structure>Y</auto_update_table_structure>
+    <always_drop_and_recreate>Y</always_drop_and_recreate>
+    <table>table_drop_recreate_test</table>
+    <tablename_field/>
+    <tablename_in_field>N</tablename_in_field>
+    <tablename_in_table>Y</tablename_in_table>
+    <truncate>N</truncate>
+    <use_batch>Y</use_batch>
+    <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>96</yloc>
+    </GUI>
+  </transform>
+  <transform_error_handling>
+  </transform_error_handling>
+  <attributes/>
+</pipeline>
+
diff --git a/integration-tests/database/main-0033-table-output-auto-ddl.hwf 
b/integration-tests/database/main-0033-table-output-auto-ddl.hwf
new file mode 100644
index 0000000000..650adaeb05
--- /dev/null
+++ b/integration-tests/database/main-0033-table-output-auto-ddl.hwf
@@ -0,0 +1,296 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+  <name>main-0033-table-output-auto-ddl</name>
+  <name_sync_with_filename>Y</name_sync_with_filename>
+  <description>Test automatic DDL execution - Phase 1: Auto-create and 
drop/recreate</description>
+  <extended_description/>
+  <workflow_version/>
+  <created_user>-</created_user>
+  <created_date>2025/10/04 15:00:00.000</created_date>
+  <modified_user>-</modified_user>
+  <modified_date>2025/10/04 15:00:00.000</modified_date>
+  <parameters>
+    </parameters>
+  <actions>
+    <action>
+      <name>Start</name>
+      <description/>
+      <type>SPECIAL</type>
+      <attributes/>
+      <repeat>N</repeat>
+      <schedulerType>0</schedulerType>
+      <intervalSeconds>0</intervalSeconds>
+      <intervalMinutes>60</intervalMinutes>
+      <hour>12</hour>
+      <minutes>0</minutes>
+      <weekDay>1</weekDay>
+      <DayOfMonth>1</DayOfMonth>
+      <parallel>N</parallel>
+      <xloc>64</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Drop test tables</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_auto_create_test;
+DROP TABLE IF EXISTS public.table_drop_recreate_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>F</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>262</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>0033-table-output-auto-create.hpl</name>
+      <description/>
+      <type>PIPELINE</type>
+      <attributes/>
+      <filename>${PROJECT_HOME}/0033-table-output-auto-create.hpl</filename>
+      <params_from_previous>N</params_from_previous>
+      <exec_per_row>N</exec_per_row>
+      <clear_rows>N</clear_rows>
+      <clear_files>N</clear_files>
+      <set_logfile>N</set_logfile>
+      <logfile/>
+      <logext/>
+      <add_date>N</add_date>
+      <add_time>N</add_time>
+      <loglevel>Basic</loglevel>
+      <set_append_logfile>N</set_append_logfile>
+      <wait_until_finished>Y</wait_until_finished>
+      <create_parent_folder>N</create_parent_folder>
+      <run_configuration>local</run_configuration>
+      <parameters>
+        <pass_all_parameters>Y</pass_all_parameters>
+      </parameters>
+      <parallel>N</parallel>
+      <xloc>460</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Check 100 rows created</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_auto_create_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>100</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>658</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>0033-table-output-drop-recreate.hpl</name>
+      <description/>
+      <type>PIPELINE</type>
+      <attributes/>
+      <filename>${PROJECT_HOME}/0033-table-output-drop-recreate.hpl</filename>
+      <params_from_previous>N</params_from_previous>
+      <exec_per_row>N</exec_per_row>
+      <clear_rows>N</clear_rows>
+      <clear_files>N</clear_files>
+      <set_logfile>N</set_logfile>
+      <logfile/>
+      <logext/>
+      <add_date>N</add_date>
+      <add_time>N</add_time>
+      <loglevel>Basic</loglevel>
+      <set_append_logfile>N</set_append_logfile>
+      <wait_until_finished>Y</wait_until_finished>
+      <create_parent_folder>N</create_parent_folder>
+      <run_configuration>local</run_configuration>
+      <parameters>
+        <pass_all_parameters>Y</pass_all_parameters>
+      </parameters>
+      <parallel>N</parallel>
+      <xloc>856</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Check 50 rows recreated</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_drop_recreate_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>50</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>1056</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Run auto-create again (table exists)</name>
+      <description/>
+      <type>PIPELINE</type>
+      <attributes/>
+      <filename>${PROJECT_HOME}/0033-table-output-auto-create.hpl</filename>
+      <params_from_previous>N</params_from_previous>
+      <exec_per_row>N</exec_per_row>
+      <clear_rows>N</clear_rows>
+      <clear_files>N</clear_files>
+      <set_logfile>N</set_logfile>
+      <logfile/>
+      <logext/>
+      <add_date>N</add_date>
+      <add_time>N</add_time>
+      <loglevel>Basic</loglevel>
+      <set_append_logfile>N</set_append_logfile>
+      <wait_until_finished>Y</wait_until_finished>
+      <create_parent_folder>N</create_parent_folder>
+      <run_configuration>local</run_configuration>
+      <parameters>
+        <pass_all_parameters>Y</pass_all_parameters>
+      </parameters>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Check 200 rows total</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_auto_create_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>200</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>1450</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Cleanup test tables</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_auto_create_test;
+DROP TABLE IF EXISTS public.table_drop_recreate_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>F</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1450</xloc>
+      <yloc>192</yloc>
+      <attributes_hac/>
+    </action>
+  </actions>
+  <hops>
+    <hop>
+      <from>Start</from>
+      <to>Drop test tables</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>Y</unconditional>
+    </hop>
+    <hop>
+      <from>Drop test tables</from>
+      <to>0033-table-output-auto-create.hpl</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>0033-table-output-auto-create.hpl</from>
+      <to>Check 100 rows created</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Check 100 rows created</from>
+      <to>0033-table-output-drop-recreate.hpl</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>0033-table-output-drop-recreate.hpl</from>
+      <to>Check 50 rows recreated</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Check 50 rows recreated</from>
+      <to>Run auto-create again (table exists)</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Run auto-create again (table exists)</from>
+      <to>Check 200 rows total</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Check 200 rows total</from>
+      <to>Cleanup test tables</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+  </hops>
+  <notepads>
+  </notepads>
+  <attributes/>
+</workflow>
+
diff --git a/integration-tests/database/main-0034-table-output-add-columns.hwf 
b/integration-tests/database/main-0034-table-output-add-columns.hwf
new file mode 100644
index 0000000000..b46845612a
--- /dev/null
+++ b/integration-tests/database/main-0034-table-output-add-columns.hwf
@@ -0,0 +1,260 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+  <name>main-0034-table-output-add-columns</name>
+  <name_sync_with_filename>Y</name_sync_with_filename>
+  <description>Test automatic column addition - Phase 2</description>
+  <extended_description/>
+  <workflow_version/>
+  <created_user>-</created_user>
+  <created_date>2025/10/25 10:30:00.000</created_date>
+  <modified_user>-</modified_user>
+  <modified_date>2025/10/25 10:30:00.000</modified_date>
+  <parameters>
+    </parameters>
+  <actions>
+    <action>
+      <name>Start</name>
+      <description/>
+      <type>SPECIAL</type>
+      <attributes/>
+      <repeat>N</repeat>
+      <schedulerType>0</schedulerType>
+      <intervalSeconds>0</intervalSeconds>
+      <intervalMinutes>60</intervalMinutes>
+      <hour>12</hour>
+      <minutes>0</minutes>
+      <weekDay>1</weekDay>
+      <DayOfMonth>1</DayOfMonth>
+      <parallel>N</parallel>
+      <xloc>64</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Drop test table</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_add_columns_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>262</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Create initial table with 2 columns</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+CREATE TABLE public.table_add_columns_test
+(
+    id INTEGER,
+    name VARCHAR(50)
+);
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>460</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>0033-table-output-add-columns.hpl</name>
+      <description/>
+      <type>PIPELINE</type>
+      <attributes/>
+      <filename>${PROJECT_HOME}/0033-table-output-add-columns.hpl</filename>
+      <params_from_previous>N</params_from_previous>
+      <exec_per_row>N</exec_per_row>
+      <clear_rows>N</clear_rows>
+      <clear_files>N</clear_files>
+      <set_logfile>N</set_logfile>
+      <logfile/>
+      <logext/>
+      <add_date>N</add_date>
+      <add_time>N</add_time>
+      <loglevel>Basic</loglevel>
+      <set_append_logfile>N</set_append_logfile>
+      <wait_until_finished>Y</wait_until_finished>
+      <create_parent_folder>N</create_parent_folder>
+      <run_configuration>local</run_configuration>
+      <parameters>
+        <pass_all_parameters>Y</pass_all_parameters>
+      </parameters>
+      <parallel>N</parallel>
+      <xloc>658</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Check 50 rows inserted</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_add_columns_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>50</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>856</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Verify 4 columns exist</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+-- This query should return 4 rows (one for each column)
+-- If it doesn't, the test will fail
+SELECT column_name 
+FROM information_schema.columns 
+WHERE table_schema = 'public' 
+  AND table_name = 'table_add_columns_test'
+  AND column_name IN ('id', 'name', 'new_column_1', 'new_column_2')
+ORDER BY column_name;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1056</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Verify data in new columns</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname/>
+      <tablename/>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>50</limit>
+      <is_custom_sql>Y</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql>SELECT * FROM public.table_add_columns_test 
+WHERE new_column_1 = 'Added1' 
+  AND new_column_2 = 999.99</custom_sql>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Cleanup test table</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_add_columns_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>192</yloc>
+      <attributes_hac/>
+    </action>
+  </actions>
+  <hops>
+    <hop>
+      <from>Start</from>
+      <to>Drop test table</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>Y</unconditional>
+    </hop>
+    <hop>
+      <from>Drop test table</from>
+      <to>Create initial table with 2 columns</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Create initial table with 2 columns</from>
+      <to>0033-table-output-add-columns.hpl</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>0033-table-output-add-columns.hpl</from>
+      <to>Check 50 rows inserted</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Check 50 rows inserted</from>
+      <to>Verify 4 columns exist</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Verify 4 columns exist</from>
+      <to>Verify data in new columns</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Verify data in new columns</from>
+      <to>Cleanup test table</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+  </hops>
+  <notepads>
+  </notepads>
+  <attributes/>
+</workflow>
+
diff --git a/integration-tests/database/main-0035-table-output-drop-columns.hwf 
b/integration-tests/database/main-0035-table-output-drop-columns.hwf
new file mode 100644
index 0000000000..7197f38cf5
--- /dev/null
+++ b/integration-tests/database/main-0035-table-output-drop-columns.hwf
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+  <name>main-0035-table-output-drop-columns</name>
+  <name_sync_with_filename>Y</name_sync_with_filename>
+  <description>Test automatic column removal - Phase 3</description>
+  <extended_description/>
+  <workflow_version/>
+  <created_user>-</created_user>
+  <created_date>2025/10/25 11:00:00.000</created_date>
+  <modified_user>-</modified_user>
+  <modified_date>2025/10/25 11:00:00.000</modified_date>
+  <parameters>
+    </parameters>
+  <actions>
+    <action>
+      <name>Start</name>
+      <description/>
+      <type>SPECIAL</type>
+      <attributes/>
+      <repeat>N</repeat>
+      <schedulerType>0</schedulerType>
+      <intervalSeconds>0</intervalSeconds>
+      <intervalMinutes>60</intervalMinutes>
+      <hour>12</hour>
+      <minutes>0</minutes>
+      <weekDay>1</weekDay>
+      <DayOfMonth>1</DayOfMonth>
+      <parallel>N</parallel>
+      <xloc>64</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Drop test table</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_drop_columns_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>262</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Create initial table with 4 columns</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+CREATE TABLE public.table_drop_columns_test
+(
+    id INTEGER,
+    name VARCHAR(50),
+    extra_column_1 VARCHAR(50),
+    extra_column_2 DOUBLE PRECISION
+);
+
+-- Insert some initial data to verify columns get dropped
+INSERT INTO public.table_drop_columns_test 
+VALUES (999, 'InitialData', 'ExtraData1', 123.45);
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>F</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>460</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>0033-table-output-drop-columns.hpl</name>
+      <description/>
+      <type>PIPELINE</type>
+      <attributes/>
+      <filename>${PROJECT_HOME}/0033-table-output-drop-columns.hpl</filename>
+      <params_from_previous>N</params_from_previous>
+      <exec_per_row>N</exec_per_row>
+      <clear_rows>N</clear_rows>
+      <clear_files>N</clear_files>
+      <set_logfile>N</set_logfile>
+      <logfile/>
+      <logext/>
+      <add_date>N</add_date>
+      <add_time>N</add_time>
+      <loglevel>Basic</loglevel>
+      <set_append_logfile>N</set_append_logfile>
+      <wait_until_finished>Y</wait_until_finished>
+      <create_parent_folder>N</create_parent_folder>
+      <run_configuration>local</run_configuration>
+      <parameters>
+        <pass_all_parameters>Y</pass_all_parameters>
+      </parameters>
+      <parallel>N</parallel>
+      <xloc>658</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Check 51 rows total (1 initial + 50 new)</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_drop_columns_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>51</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>856</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Verify only 2 columns remain</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+-- This query should return exactly 2 rows (id and name columns only)
+-- extra_column_1 and extra_column_2 should be dropped
+SELECT column_name 
+FROM information_schema.columns 
+WHERE table_schema = 'public' 
+  AND table_name = 'table_drop_columns_test'
+ORDER BY column_name;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1056</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Verify extra columns dropped</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname/>
+      <tablename/>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>0</limit>
+      <is_custom_sql>Y</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql>SELECT column_name 
+FROM information_schema.columns 
+WHERE table_schema = 'public' 
+  AND table_name = 'table_drop_columns_test'
+  AND column_name IN ('extra_column_1', 'extra_column_2')</custom_sql>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Cleanup test table</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_drop_columns_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>192</yloc>
+      <attributes_hac/>
+    </action>
+  </actions>
+  <hops>
+    <hop>
+      <from>Start</from>
+      <to>Drop test table</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>Y</unconditional>
+    </hop>
+    <hop>
+      <from>Drop test table</from>
+      <to>Create initial table with 4 columns</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Create initial table with 4 columns</from>
+      <to>0033-table-output-drop-columns.hpl</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>0033-table-output-drop-columns.hpl</from>
+      <to>Check 51 rows total (1 initial + 50 new)</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Check 51 rows total (1 initial + 50 new)</from>
+      <to>Verify only 2 columns remain</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Verify only 2 columns remain</from>
+      <to>Verify extra columns dropped</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Verify extra columns dropped</from>
+      <to>Cleanup test table</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+  </hops>
+  <notepads>
+  </notepads>
+  <attributes/>
+</workflow>
+
diff --git a/integration-tests/database/main-0036-table-output-change-types.hwf 
b/integration-tests/database/main-0036-table-output-change-types.hwf
new file mode 100644
index 0000000000..4d62e4a771
--- /dev/null
+++ b/integration-tests/database/main-0036-table-output-change-types.hwf
@@ -0,0 +1,261 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+  <name>main-0036-table-output-change-types</name>
+  <name_sync_with_filename>Y</name_sync_with_filename>
+  <description>Test automatic column type changes - Phase 4</description>
+  <extended_description/>
+  <workflow_version/>
+  <created_user>-</created_user>
+  <created_date>2025/10/25 11:30:00.000</created_date>
+  <modified_user>-</modified_user>
+  <modified_date>2025/10/25 11:30:00.000</modified_date>
+  <parameters>
+    </parameters>
+  <actions>
+    <action>
+      <name>Start</name>
+      <description/>
+      <type>SPECIAL</type>
+      <attributes/>
+      <repeat>N</repeat>
+      <schedulerType>0</schedulerType>
+      <intervalSeconds>0</intervalSeconds>
+      <intervalMinutes>60</intervalMinutes>
+      <hour>12</hour>
+      <minutes>0</minutes>
+      <weekDay>1</weekDay>
+      <DayOfMonth>1</DayOfMonth>
+      <parallel>N</parallel>
+      <xloc>64</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Drop test table</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_change_types_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>262</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Create initial table with VARCHAR age</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+CREATE TABLE public.table_change_types_test
+(
+    id INTEGER,
+    age VARCHAR(50),  -- Will be changed to INTEGER
+    name VARCHAR(100)
+);
+
+-- Note: Not inserting initial data because changing column types
+-- with existing data requires type casting which is database-specific
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>F</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>460</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>0033-table-output-change-types.hpl</name>
+      <description/>
+      <type>PIPELINE</type>
+      <attributes/>
+      <filename>${PROJECT_HOME}/0033-table-output-change-types.hpl</filename>
+      <params_from_previous>N</params_from_previous>
+      <exec_per_row>N</exec_per_row>
+      <clear_rows>N</clear_rows>
+      <clear_files>N</clear_files>
+      <set_logfile>N</set_logfile>
+      <logfile/>
+      <logext/>
+      <add_date>N</add_date>
+      <add_time>N</add_time>
+      <loglevel>Basic</loglevel>
+      <set_append_logfile>N</set_append_logfile>
+      <wait_until_finished>Y</wait_until_finished>
+      <create_parent_folder>N</create_parent_folder>
+      <run_configuration>local</run_configuration>
+      <parameters>
+        <pass_all_parameters>Y</pass_all_parameters>
+      </parameters>
+      <parallel>N</parallel>
+      <xloc>658</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Check 50 rows inserted</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_change_types_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>50</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>856</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Verify age column is now INTEGER</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+-- Check that age column data type changed from VARCHAR to INTEGER
+-- This query should return 'integer' or similar type name
+SELECT data_type 
+FROM information_schema.columns 
+WHERE table_schema = 'public' 
+  AND table_name = 'table_change_types_test'
+  AND column_name = 'age';
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1056</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Verify integer values work</name>
+      <description/>
+      <type>EVAL_TABLE_CONTENT</type>
+      <attributes/>
+      <connection>unit-test-db</connection>
+      <schemaname>public</schemaname>
+      <tablename>table_change_types_test</tablename>
+      <success_condition>rows_count_equal</success_condition>
+      <limit>50</limit>
+      <is_custom_sql>N</is_custom_sql>
+      <is_usevars>N</is_usevars>
+      <custom_sql/>
+      <add_rows_result>N</add_rows_result>
+      <clear_result_rows>Y</clear_result_rows>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>48</yloc>
+      <attributes_hac/>
+    </action>
+    <action>
+      <name>Cleanup test table</name>
+      <description/>
+      <type>SQL</type>
+      <attributes/>
+      <sql>
+DROP TABLE IF EXISTS public.table_change_types_test;
+</sql>
+      <useVariableSubstitution>F</useVariableSubstitution>
+      <sqlfromfile>F</sqlfromfile>
+      <sqlfilename/>
+      <sendOneStatement>T</sendOneStatement>
+      <connection>unit-test-db</connection>
+      <parallel>N</parallel>
+      <xloc>1248</xloc>
+      <yloc>192</yloc>
+      <attributes_hac/>
+    </action>
+  </actions>
+  <hops>
+    <hop>
+      <from>Start</from>
+      <to>Drop test table</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>Y</unconditional>
+    </hop>
+    <hop>
+      <from>Drop test table</from>
+      <to>Create initial table with VARCHAR age</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Create initial table with VARCHAR age</from>
+      <to>0033-table-output-change-types.hpl</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>0033-table-output-change-types.hpl</from>
+      <to>Check 50 rows inserted</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Check 50 rows inserted</from>
+      <to>Verify age column is now INTEGER</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Verify age column is now INTEGER</from>
+      <to>Verify integer values work</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+    <hop>
+      <from>Verify integer values work</from>
+      <to>Cleanup test table</to>
+      <enabled>Y</enabled>
+      <evaluation>Y</evaluation>
+      <unconditional>N</unconditional>
+    </hop>
+  </hops>
+  <notepads>
+  </notepads>
+  <attributes/>
+</workflow>
+
diff --git 
a/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutput.java
 
b/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutput.java
index 103220d934..c13918d4dc 100644
--- 
a/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutput.java
+++ 
b/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutput.java
@@ -20,6 +20,7 @@ package org.apache.hop.pipeline.transforms.tableoutput;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -73,6 +74,19 @@ public class TableOutput extends 
BaseTransform<TableOutputMeta, TableOutputData>
       if (meta.isTruncateTable()) {
         truncateTable();
       }
+
+      // Handle automatic table structure updates
+      if (meta.isAutoUpdateTableStructure()) {
+        updateTableStructure();
+
+        // Clear prepared statement cache after DDL changes
+        // This ensures INSERT statements are rebuilt with the new table 
structure
+        if (data.preparedStatements != null) {
+          data.preparedStatements.clear();
+          logDetailed("Cleared prepared statement cache after table structure 
updates");
+        }
+      }
+
       data.outputRowMeta = getInputRowMeta().clone();
       meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, 
metadataProvider);
 
@@ -560,6 +574,479 @@ public class TableOutput extends 
BaseTransform<TableOutputMeta, TableOutputData>
     }
   }
 
+  void updateTableStructure() throws HopException {
+    if (!meta.isPartitioningEnabled() && !meta.isTableNameInField()) {
+      // Only the first one updates table structure in a non-partitioned 
transform copy
+      if ((getCopy() == 0) || !Utils.isEmpty(getPartitionId())) {
+        String schemaName = resolve(meta.getSchemaName());
+        String tableName = resolve(meta.getTableName());
+        String fullTableName =
+            data.databaseMeta.getQuotedSchemaTableCombination(this, 
schemaName, tableName);
+
+        // Handle drop and recreate option
+        if (meta.isAlwaysDropAndRecreate()) {
+          dropTable(fullTableName);
+        }
+
+        // Ensure table exists (same logic for both options)
+        ensureTableExists(fullTableName, schemaName, tableName);
+
+        // Add missing columns if option is enabled
+        if (meta.isAddColumns()) {
+          addMissingColumns(fullTableName, schemaName, tableName);
+        }
+
+        // Drop surplus columns if option is enabled
+        if (meta.isDropColumns()) {
+          dropSurplusColumns(fullTableName, schemaName, tableName);
+        }
+
+        // Change column types if option is enabled
+        if (meta.isChangeColumnTypes()) {
+          changeColumnTypes(fullTableName, schemaName, tableName);
+        }
+      }
+    }
+  }
+
+  private void dropTable(String fullTableName) throws HopException {
+    logBasic("Dropping table: " + fullTableName);
+    String dropSql = 
data.databaseMeta.getDropTableIfExistsStatement(fullTableName);
+    if (!Utils.isEmpty(dropSql)) {
+      try {
+        data.db.execStatement(dropSql);
+        // Commit the DDL to finalize table drop
+        data.db.commit();
+        logBasic("Dropped table: " + fullTableName);
+      } catch (Exception e) {
+        logDetailed("Drop table failed (may not exist): " + e.getMessage());
+        // Rollback transaction to clear any aborted state
+        try {
+          data.db.rollback();
+          logDetailed("Rolled back transaction after drop table failure");
+        } catch (Exception rollbackException) {
+          logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+        }
+      }
+    }
+  }
+
+  void ensureTableExists(String fullTableName, String schemaName, String 
tableName)
+      throws HopException {
+    // Try to create the table - if it already exists, the creation will fail
+    // This avoids the transaction-aborting table existence check
+    try {
+      createTable(fullTableName);
+    } catch (HopException e) {
+      // If creation failed, rollback and verify the table actually exists
+      // This handles the case where table creation failed for a reason other 
than "already exists"
+      logDetailed("Table creation failed or table already exists, 
verifying...");
+
+      // Rollback to clear any aborted transaction state before checking 
existence
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after create table failure in 
ensureTableExists");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+
+      boolean tableExists = checkTableExists(schemaName, tableName, true);
+      if (!tableExists) {
+        // Table still doesn't exist after failed creation attempt - this is a 
real error
+        throw new HopException(
+            "Failed to create table " + fullTableName + " and table does not 
exist", e);
+      } else {
+        logDetailed("Table already exists: " + fullTableName);
+      }
+    }
+  }
+
+  private boolean checkTableExists(String schemaName, String tableName, 
boolean bypassCache)
+      throws HopException {
+    if (bypassCache) {
+      // Clear cache to ensure fresh state
+      try {
+        
org.apache.hop.core.DbCache.getInstance().clear(data.databaseMeta.getName());
+        logDetailed("Cleared database cache to ensure fresh table state");
+      } catch (Exception cacheException) {
+        logError("Could not clear database cache: " + 
cacheException.getMessage());
+      }
+    }
+
+    try {
+      return data.db.checkTableExists(schemaName, tableName);
+    } catch (Exception e) {
+      logDetailed("Table existence check failed, assuming table doesn't exist: 
" + e.getMessage());
+      // Rollback transaction to clear any aborted state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after table existence check 
failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      return false; // Assume table doesn't exist if check fails
+    }
+  }
+
+  private void createTable(String fullTableName) throws HopException {
+    logBasic("Creating table: " + fullTableName);
+    IRowMeta inputRowMeta = getInputRowMeta();
+    String createSql =
+        data.db.getCreateTableStatement(fullTableName, inputRowMeta, null, 
false, null, true);
+    if (!Utils.isEmpty(createSql)) {
+      logDetailed("CREATE TABLE SQL: " + createSql);
+      data.db.execStatement(createSql);
+      // Commit the DDL to finalize table creation
+      data.db.commit();
+      logBasic("Successfully created and committed table: " + fullTableName);
+
+      // Clear cache after successful creation to ensure fresh state
+      try {
+        
org.apache.hop.core.DbCache.getInstance().clear(data.databaseMeta.getName());
+        logDetailed("Cleared database cache after table creation");
+      } catch (Exception cacheException) {
+        logError("Could not clear database cache: " + 
cacheException.getMessage());
+      }
+    }
+  }
+
+  private void addMissingColumns(String fullTableName, String schemaName, 
String tableName)
+      throws HopException {
+    logDetailed("Checking for missing columns in table: " + fullTableName);
+
+    try {
+      // Get the current table structure from the database
+      IRowMeta tableFields = data.db.getTableFieldsMeta(schemaName, tableName);
+      if (tableFields == null) {
+        logDetailed("Could not retrieve table structure for: " + 
fullTableName);
+        return;
+      }
+
+      // Get the incoming stream structure
+      IRowMeta streamFields = getInputRowMeta();
+      if (streamFields == null) {
+        logDetailed("No incoming stream fields available");
+        return;
+      }
+
+      // Find columns that exist in the stream but not in the table
+      List<IValueMeta> missingColumns = new ArrayList<>();
+      for (IValueMeta streamField : streamFields.getValueMetaList()) {
+        String fieldName = streamField.getName();
+        if (tableFields.searchValueMeta(fieldName) == null) {
+          missingColumns.add(streamField);
+          logBasic("Found missing column: " + fieldName + " (" + 
streamField.getTypeDesc() + ")");
+        }
+      }
+
+      // Add each missing column
+      if (!missingColumns.isEmpty()) {
+        for (IValueMeta missingColumn : missingColumns) {
+          addColumn(fullTableName, missingColumn);
+        }
+        logBasic(
+            "Added " + missingColumns.size() + " missing column(s) to table: " 
+ fullTableName);
+
+        // Clear cache after modifications
+        try {
+          
org.apache.hop.core.DbCache.getInstance().clear(data.databaseMeta.getName());
+          logDetailed("Cleared database cache after adding columns");
+        } catch (Exception cacheException) {
+          logError("Could not clear database cache: " + 
cacheException.getMessage());
+        }
+      } else {
+        logDetailed("No missing columns found in table: " + fullTableName);
+      }
+
+    } catch (Exception e) {
+      logError("Error checking/adding missing columns: " + e.getMessage(), e);
+      // Rollback to clear any aborted transaction state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after add columns failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      throw new HopException("Failed to add missing columns to table " + 
fullTableName, e);
+    }
+  }
+
+  private void addColumn(String fullTableName, IValueMeta column) throws 
HopException {
+    logDetailed("Adding column: " + column.getName() + " to table: " + 
fullTableName);
+
+    try {
+      String addColumnStatement =
+          data.databaseMeta.getAddColumnStatement(fullTableName, column, null, 
false, null, false);
+
+      if (!Utils.isEmpty(addColumnStatement)) {
+        logDetailed("ALTER TABLE SQL: " + addColumnStatement);
+        data.db.execStatement(addColumnStatement);
+        // Commit the DDL to finalize column addition
+        data.db.commit();
+        logBasic("Successfully added column: " + column.getName());
+      }
+    } catch (Exception e) {
+      logError("Error adding column " + column.getName() + ": " + 
e.getMessage(), e);
+      // Rollback to clear any aborted transaction state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after add column failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      throw new HopException("Failed to add column " + column.getName(), e);
+    }
+  }
+
+  private void dropSurplusColumns(String fullTableName, String schemaName, 
String tableName)
+      throws HopException {
+    logDetailed("Checking for surplus columns in table: " + fullTableName);
+
+    try {
+      // Get the current table structure from the database
+      IRowMeta tableFields = data.db.getTableFieldsMeta(schemaName, tableName);
+      if (tableFields == null) {
+        logDetailed("Could not retrieve table structure for: " + 
fullTableName);
+        return;
+      }
+
+      // Get the incoming stream structure
+      IRowMeta streamFields = getInputRowMeta();
+      if (streamFields == null) {
+        logDetailed("No incoming stream fields available");
+        return;
+      }
+
+      // Find columns that exist in the table but not in the stream
+      List<IValueMeta> surplusColumns = new ArrayList<>();
+      for (IValueMeta tableField : tableFields.getValueMetaList()) {
+        String fieldName = tableField.getName();
+        if (streamFields.searchValueMeta(fieldName) == null) {
+          surplusColumns.add(tableField);
+          logBasic("Found surplus column: " + fieldName + " (" + 
tableField.getTypeDesc() + ")");
+        }
+      }
+
+      // Drop each surplus column
+      if (!surplusColumns.isEmpty()) {
+        logBasic(
+            "WARNING: Dropping "
+                + surplusColumns.size()
+                + " column(s) from table: "
+                + fullTableName
+                + " - THIS WILL RESULT IN DATA LOSS");
+        for (IValueMeta surplusColumn : surplusColumns) {
+          dropColumn(fullTableName, surplusColumn);
+        }
+        logBasic(
+            "Dropped " + surplusColumns.size() + " surplus column(s) from 
table: " + fullTableName);
+
+        // Clear cache after modifications
+        try {
+          
org.apache.hop.core.DbCache.getInstance().clear(data.databaseMeta.getName());
+          logDetailed("Cleared database cache after dropping columns");
+        } catch (Exception cacheException) {
+          logError("Could not clear database cache: " + 
cacheException.getMessage());
+        }
+      } else {
+        logDetailed("No surplus columns found in table: " + fullTableName);
+      }
+
+    } catch (Exception e) {
+      logError("Error checking/dropping surplus columns: " + e.getMessage(), 
e);
+      // Rollback to clear any aborted transaction state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after drop columns failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      throw new HopException("Failed to drop surplus columns from table " + 
fullTableName, e);
+    }
+  }
+
+  private void dropColumn(String fullTableName, IValueMeta column) throws 
HopException {
+    logDetailed("Dropping column: " + column.getName() + " from table: " + 
fullTableName);
+
+    try {
+      String dropColumnStatement =
+          data.databaseMeta.getDropColumnStatement(fullTableName, column, 
null, false, null, false);
+
+      if (!Utils.isEmpty(dropColumnStatement)) {
+        logDetailed("ALTER TABLE SQL: " + dropColumnStatement);
+        data.db.execStatement(dropColumnStatement);
+        // Commit the DDL to finalize column drop
+        data.db.commit();
+        logBasic("Successfully dropped column: " + column.getName());
+      }
+    } catch (Exception e) {
+      logError("Error dropping column " + column.getName() + ": " + 
e.getMessage(), e);
+      // Rollback to clear any aborted transaction state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after drop column failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      throw new HopException("Failed to drop column " + column.getName(), e);
+    }
+  }
+
+  private void changeColumnTypes(String fullTableName, String schemaName, 
String tableName)
+      throws HopException {
+    logDetailed("Checking for column type mismatches in table: " + 
fullTableName);
+
+    try {
+      // Get the current table structure from the database
+      IRowMeta tableFields = data.db.getTableFieldsMeta(schemaName, tableName);
+      if (tableFields == null) {
+        logDetailed("Could not retrieve table structure for: " + 
fullTableName);
+        return;
+      }
+
+      // Get the incoming stream structure
+      IRowMeta streamFields = getInputRowMeta();
+      if (streamFields == null) {
+        logDetailed("No incoming stream fields available");
+        return;
+      }
+
+      // Find columns where types don't match
+      List<IValueMeta> columnsToModify = new ArrayList<>();
+      for (IValueMeta streamField : streamFields.getValueMetaList()) {
+        String fieldName = streamField.getName();
+        IValueMeta tableField = tableFields.searchValueMeta(fieldName);
+
+        if (tableField != null) {
+          // Column exists in both, check if types are compatible
+          if (!typesAreCompatible(tableField, streamField)) {
+            columnsToModify.add(streamField);
+            logBasic(
+                "Found type mismatch for column: "
+                    + fieldName
+                    + " (table: "
+                    + tableField.getTypeDesc()
+                    + ", stream: "
+                    + streamField.getTypeDesc()
+                    + ")");
+          }
+        }
+      }
+
+      // Modify each column that needs type change
+      if (!columnsToModify.isEmpty()) {
+        logBasic(
+            "WARNING: Changing data types for "
+                + columnsToModify.size()
+                + " column(s) in table: "
+                + fullTableName
+                + " - THIS MAY RESULT IN DATA LOSS");
+        for (IValueMeta columnToModify : columnsToModify) {
+          modifyColumn(fullTableName, columnToModify);
+        }
+        logBasic(
+            "Modified " + columnsToModify.size() + " column type(s) in table: 
" + fullTableName);
+
+        // Clear cache after modifications
+        try {
+          
org.apache.hop.core.DbCache.getInstance().clear(data.databaseMeta.getName());
+          logDetailed("Cleared database cache after modifying column types");
+        } catch (Exception cacheException) {
+          logError("Could not clear database cache: " + 
cacheException.getMessage());
+        }
+      } else {
+        logDetailed("No column type mismatches found in table: " + 
fullTableName);
+      }
+
+    } catch (Exception e) {
+      logError("Error checking/changing column types: " + e.getMessage(), e);
+      // Rollback to clear any aborted transaction state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after change column types 
failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      throw new HopException("Failed to change column types in table " + 
fullTableName, e);
+    }
+  }
+
+  boolean typesAreCompatible(IValueMeta tableField, IValueMeta streamField) {
+    // Same type - compatible
+    if (tableField.getType() == streamField.getType()) {
+      // For strings, check if stream length is greater than table length
+      if (tableField.getType() == IValueMeta.TYPE_STRING) {
+        int tableLength = tableField.getLength();
+        int streamLength = streamField.getLength();
+        // If stream has larger or undefined length, types are not compatible
+        if (streamLength > tableLength || (streamLength < 0 && tableLength > 
0)) {
+          return false;
+        }
+      }
+      // For numbers with precision, check if precision/scale differs
+      if (tableField.getType() == IValueMeta.TYPE_NUMBER
+          || tableField.getType() == IValueMeta.TYPE_BIGNUMBER) {
+        if (tableField.getPrecision() != streamField.getPrecision()
+            || tableField.getLength() != streamField.getLength()) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    // Different types - generally not compatible, but some conversions are 
safe
+    // e.g., INTEGER -> BIGINT, but we'll require explicit type match for 
safety
+    return false;
+  }
+
+  private void modifyColumn(String fullTableName, IValueMeta column) throws 
HopException {
+    logDetailed(
+        "Modifying column: "
+            + column.getName()
+            + " to type: "
+            + column.getTypeDesc()
+            + " in table: "
+            + fullTableName);
+
+    try {
+      // For type changes, use drop/recreate approach as it's simpler and more 
reliable
+      // The complex modify statement from DatabaseMeta often fails with type 
conversions
+      logDetailed("Using drop/recreate approach for column type change: " + 
column.getName());
+
+      // Drop the existing column
+      String dropColumnStatement =
+          data.databaseMeta.getDropColumnStatement(fullTableName, column, 
null, false, null, false);
+      if (!Utils.isEmpty(dropColumnStatement)) {
+        logDetailed("DROP COLUMN SQL: " + dropColumnStatement);
+        data.db.execStatement(dropColumnStatement);
+        data.db.commit();
+        logDetailed("Dropped column: " + column.getName());
+      }
+
+      // Add the column with new type
+      String addColumnStatement =
+          data.databaseMeta.getAddColumnStatement(fullTableName, column, null, 
false, null, false);
+      if (!Utils.isEmpty(addColumnStatement)) {
+        logDetailed("ADD COLUMN SQL: " + addColumnStatement);
+        data.db.execStatement(addColumnStatement);
+        data.db.commit();
+        logDetailed("Added column with new type: " + column.getName());
+      }
+
+      logBasic("Successfully modified column type (via drop/recreate): " + 
column.getName());
+    } catch (Exception e) {
+      logError("Error modifying column " + column.getName() + ": " + 
e.getMessage(), e);
+      // Rollback to clear any aborted transaction state
+      try {
+        data.db.rollback();
+        logDetailed("Rolled back transaction after modify column failure");
+      } catch (Exception rollbackException) {
+        logDetailed("Could not rollback transaction: " + 
rollbackException.getMessage());
+      }
+      throw new HopException("Failed to modify column " + column.getName(), e);
+    }
+  }
+
   @Override
   public void dispose() {
 
diff --git 
a/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputDialog.java
 
b/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputDialog.java
index 55a320b668..66d75b0d7d 100644
--- 
a/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputDialog.java
+++ 
b/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputDialog.java
@@ -98,6 +98,21 @@ public class TableOutputDialog extends BaseTransformDialog {
 
   private Button wSpecifyFields;
 
+  private Label wlAutoUpdateTableStructure;
+  private Button wAutoUpdateTableStructure;
+
+  private Label wlAlwaysDropAndRecreate;
+  private Button wAlwaysDropAndRecreate;
+
+  private Label wlAddColumns;
+  private Button wAddColumns;
+
+  private Label wlDropColumns;
+  private Button wDropColumns;
+
+  private Label wlChangeColumnTypes;
+  private Button wChangeColumnTypes;
+
   private Label wlBatch;
   private Button wBatch;
 
@@ -384,6 +399,106 @@ public class TableOutputDialog extends 
BaseTransformDialog {
           }
         });
 
+    // Automatically update table structure
+    Label wlAutoUpdateTableStructure = new Label(shell, SWT.RIGHT);
+    wlAutoUpdateTableStructure.setText(
+        BaseMessages.getString(PKG, 
"TableOutputDialog.AutoUpdateTableStructure.Label"));
+    PropsUi.setLook(wlAutoUpdateTableStructure);
+    FormData fdlAutoUpdateTableStructure = new FormData();
+    fdlAutoUpdateTableStructure.left = new FormAttachment(0, 0);
+    fdlAutoUpdateTableStructure.top = new FormAttachment(wlSpecifyFields, 
margin);
+    fdlAutoUpdateTableStructure.right = new FormAttachment(middle, -margin);
+    wlAutoUpdateTableStructure.setLayoutData(fdlAutoUpdateTableStructure);
+    wAutoUpdateTableStructure = new Button(shell, SWT.CHECK);
+    PropsUi.setLook(wAutoUpdateTableStructure);
+    FormData fdAutoUpdateTableStructure = new FormData();
+    fdAutoUpdateTableStructure.left = new FormAttachment(middle, 0);
+    fdAutoUpdateTableStructure.top = new 
FormAttachment(wlAutoUpdateTableStructure, 0, SWT.CENTER);
+    fdAutoUpdateTableStructure.right = new FormAttachment(100, 0);
+    wAutoUpdateTableStructure.setLayoutData(fdAutoUpdateTableStructure);
+    wAutoUpdateTableStructure.addSelectionListener(lsSelMod);
+    wAutoUpdateTableStructure.addSelectionListener(
+        new SelectionAdapter() {
+          @Override
+          public void widgetSelected(SelectionEvent arg0) {
+            setFlags();
+          }
+        });
+
+    // Always drop and recreate table
+    Label wlAlwaysDropAndRecreate = new Label(shell, SWT.RIGHT);
+    wlAlwaysDropAndRecreate.setText(
+        BaseMessages.getString(PKG, 
"TableOutputDialog.AlwaysDropAndRecreate.Label"));
+    PropsUi.setLook(wlAlwaysDropAndRecreate);
+    FormData fdlAlwaysDropAndRecreate = new FormData();
+    fdlAlwaysDropAndRecreate.left = new FormAttachment(0, 0);
+    fdlAlwaysDropAndRecreate.top = new 
FormAttachment(wlAutoUpdateTableStructure, margin);
+    fdlAlwaysDropAndRecreate.right = new FormAttachment(middle, -margin);
+    wlAlwaysDropAndRecreate.setLayoutData(fdlAlwaysDropAndRecreate);
+    wAlwaysDropAndRecreate = new Button(shell, SWT.CHECK);
+    PropsUi.setLook(wAlwaysDropAndRecreate);
+    FormData fdAlwaysDropAndRecreate = new FormData();
+    fdAlwaysDropAndRecreate.left = new FormAttachment(middle, 0);
+    fdAlwaysDropAndRecreate.top = new FormAttachment(wlAlwaysDropAndRecreate, 
0, SWT.CENTER);
+    fdAlwaysDropAndRecreate.right = new FormAttachment(100, 0);
+    wAlwaysDropAndRecreate.setLayoutData(fdAlwaysDropAndRecreate);
+    wAlwaysDropAndRecreate.addSelectionListener(lsSelMod);
+
+    // Add columns
+    wlAddColumns = new Label(shell, SWT.RIGHT);
+    wlAddColumns.setText(BaseMessages.getString(PKG, 
"TableOutputDialog.AddColumns.Label"));
+    PropsUi.setLook(wlAddColumns);
+    FormData fdlAddColumns = new FormData();
+    fdlAddColumns.left = new FormAttachment(0, 0);
+    fdlAddColumns.top = new FormAttachment(wlAlwaysDropAndRecreate, margin);
+    fdlAddColumns.right = new FormAttachment(middle, -margin);
+    wlAddColumns.setLayoutData(fdlAddColumns);
+    wAddColumns = new Button(shell, SWT.CHECK);
+    PropsUi.setLook(wAddColumns);
+    FormData fdAddColumns = new FormData();
+    fdAddColumns.left = new FormAttachment(middle, 0);
+    fdAddColumns.top = new FormAttachment(wlAddColumns, 0, SWT.CENTER);
+    fdAddColumns.right = new FormAttachment(100, 0);
+    wAddColumns.setLayoutData(fdAddColumns);
+    wAddColumns.addSelectionListener(lsSelMod);
+
+    // Drop non-existing columns
+    wlDropColumns = new Label(shell, SWT.RIGHT);
+    wlDropColumns.setText(BaseMessages.getString(PKG, 
"TableOutputDialog.DropColumns.Label"));
+    PropsUi.setLook(wlDropColumns);
+    FormData fdlDropColumns = new FormData();
+    fdlDropColumns.left = new FormAttachment(0, 0);
+    fdlDropColumns.top = new FormAttachment(wlAddColumns, margin);
+    fdlDropColumns.right = new FormAttachment(middle, -margin);
+    wlDropColumns.setLayoutData(fdlDropColumns);
+    wDropColumns = new Button(shell, SWT.CHECK);
+    PropsUi.setLook(wDropColumns);
+    FormData fdDropColumns = new FormData();
+    fdDropColumns.left = new FormAttachment(middle, 0);
+    fdDropColumns.top = new FormAttachment(wlDropColumns, 0, SWT.CENTER);
+    fdDropColumns.right = new FormAttachment(100, 0);
+    wDropColumns.setLayoutData(fdDropColumns);
+    wDropColumns.addSelectionListener(lsSelMod);
+
+    // Change column data types
+    wlChangeColumnTypes = new Label(shell, SWT.RIGHT);
+    wlChangeColumnTypes.setText(
+        BaseMessages.getString(PKG, 
"TableOutputDialog.ChangeColumnTypes.Label"));
+    PropsUi.setLook(wlChangeColumnTypes);
+    FormData fdlChangeColumnTypes = new FormData();
+    fdlChangeColumnTypes.left = new FormAttachment(0, 0);
+    fdlChangeColumnTypes.top = new FormAttachment(wlDropColumns, margin);
+    fdlChangeColumnTypes.right = new FormAttachment(middle, -margin);
+    wlChangeColumnTypes.setLayoutData(fdlChangeColumnTypes);
+    wChangeColumnTypes = new Button(shell, SWT.CHECK);
+    PropsUi.setLook(wChangeColumnTypes);
+    FormData fdChangeColumnTypes = new FormData();
+    fdChangeColumnTypes.left = new FormAttachment(middle, 0);
+    fdChangeColumnTypes.top = new FormAttachment(wlChangeColumnTypes, 0, 
SWT.CENTER);
+    fdChangeColumnTypes.right = new FormAttachment(100, 0);
+    wChangeColumnTypes.setLayoutData(fdChangeColumnTypes);
+    wChangeColumnTypes.addSelectionListener(lsSelMod);
+
     CTabFolder wTabFolder = new CTabFolder(shell, SWT.BORDER);
     PropsUi.setLook(wTabFolder, Props.WIDGET_STYLE_TAB);
 
@@ -812,7 +927,7 @@ public class TableOutputDialog extends BaseTransformDialog {
 
     FormData fdTabFolder = new FormData();
     fdTabFolder.left = new FormAttachment(0, 0);
-    fdTabFolder.top = new FormAttachment(wSpecifyFields, 3 * margin);
+    fdTabFolder.top = new FormAttachment(wlChangeColumnTypes, 3 * margin);
     fdTabFolder.right = new FormAttachment(100, 0);
     fdTabFolder.bottom = new FormAttachment(wOk, -margin);
     wTabFolder.setLayoutData(fdTabFolder);
@@ -1226,6 +1341,42 @@ public class TableOutputDialog extends 
BaseTransformDialog {
     wlNameInTable.setEnabled(isTableNameInField && !specifyFields);
     wNameInTable.setEnabled(isTableNameInField && !specifyFields);
 
+    // Handle auto update table structure options (only if UI components are 
initialized)
+    if (wAutoUpdateTableStructure != null
+        && wAlwaysDropAndRecreate != null
+        && wlAutoUpdateTableStructure != null
+        && wlAlwaysDropAndRecreate != null) {
+      boolean autoUpdateTableStructure = 
wAutoUpdateTableStructure.getSelection();
+      boolean alwaysDropAndRecreate = wAlwaysDropAndRecreate.getSelection();
+
+      // Auto update table structure is incompatible with specify fields
+      boolean enableAutoUpdate = !specifyFields;
+      boolean enableAlwaysDropAndRecreate = autoUpdateTableStructure && 
enableAutoUpdate;
+
+      // If specify fields is enabled, disable auto update table structure
+      if (specifyFields && autoUpdateTableStructure) {
+        wAutoUpdateTableStructure.setSelection(false);
+        autoUpdateTableStructure = false;
+      }
+
+      // If auto update is disabled, disable always drop and recreate
+      if (!autoUpdateTableStructure && alwaysDropAndRecreate) {
+        wAlwaysDropAndRecreate.setSelection(false);
+        alwaysDropAndRecreate = false;
+      }
+
+      wlAutoUpdateTableStructure.setEnabled(enableAutoUpdate);
+      wAutoUpdateTableStructure.setEnabled(enableAutoUpdate);
+      wlAlwaysDropAndRecreate.setEnabled(enableAlwaysDropAndRecreate);
+      wAlwaysDropAndRecreate.setEnabled(enableAlwaysDropAndRecreate);
+      wlAddColumns.setEnabled(enableAlwaysDropAndRecreate);
+      wAddColumns.setEnabled(enableAlwaysDropAndRecreate);
+      wlDropColumns.setEnabled(enableAlwaysDropAndRecreate);
+      wDropColumns.setEnabled(enableAlwaysDropAndRecreate);
+      wlChangeColumnTypes.setEnabled(enableAlwaysDropAndRecreate);
+      wChangeColumnTypes.setEnabled(enableAlwaysDropAndRecreate);
+    }
+
     DatabaseMeta databaseMeta = 
pipelineMeta.findDatabase(wConnection.getText(), variables);
     if (databaseMeta != null) {
       if (!databaseMeta.supportsAutoGeneratedKeys()) {
@@ -1283,6 +1434,22 @@ public class TableOutputDialog extends 
BaseTransformDialog {
 
     wSpecifyFields.setSelection(input.isSpecifyFields());
 
+    if (wAutoUpdateTableStructure != null) {
+      
wAutoUpdateTableStructure.setSelection(input.isAutoUpdateTableStructure());
+    }
+    if (wAlwaysDropAndRecreate != null) {
+      wAlwaysDropAndRecreate.setSelection(input.isAlwaysDropAndRecreate());
+    }
+    if (wAddColumns != null) {
+      wAddColumns.setSelection(input.isAddColumns());
+    }
+    if (wDropColumns != null) {
+      wDropColumns.setSelection(input.isDropColumns());
+    }
+    if (wChangeColumnTypes != null) {
+      wChangeColumnTypes.setSelection(input.isChangeColumnTypes());
+    }
+
     for (int i = 0; i < input.getFields().size(); i++) {
       TableOutputField tf = input.getFields().get(i);
       TableItem item = wFields.table.getItem(i);
@@ -1339,6 +1506,21 @@ public class TableOutputDialog extends 
BaseTransformDialog {
     info.setReturningGeneratedKeys(wReturnKeys.getSelection());
     info.setGeneratedKeyField(wReturnField.getText());
     info.setSpecifyFields(wSpecifyFields.getSelection());
+    if (wAutoUpdateTableStructure != null) {
+      
info.setAutoUpdateTableStructure(wAutoUpdateTableStructure.getSelection());
+    }
+    if (wAlwaysDropAndRecreate != null) {
+      info.setAlwaysDropAndRecreate(wAlwaysDropAndRecreate.getSelection());
+    }
+    if (wAddColumns != null) {
+      info.setAddColumns(wAddColumns.getSelection());
+    }
+    if (wDropColumns != null) {
+      info.setDropColumns(wDropColumns.getSelection());
+    }
+    if (wChangeColumnTypes != null) {
+      info.setChangeColumnTypes(wChangeColumnTypes.getSelection());
+    }
 
     int nrRows = wFields.nrNonEmpty();
     info.getFields().clear();
diff --git 
a/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMeta.java
 
b/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMeta.java
index 0c43a8adf2..92359d4bf2 100644
--- 
a/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMeta.java
+++ 
b/plugins/transforms/tableoutput/src/main/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMeta.java
@@ -175,6 +175,41 @@ public class TableOutputMeta extends 
BaseTransformMeta<TableOutput, TableOutputD
       injectionKeyDescription = 
"TableOutputMeta.Injection.SpecifyFields.Field")
   private boolean specifyFields;
 
+  /** Automatically update table structure based on incoming data stream */
+  @HopMetadataProperty(
+      key = "auto_update_table_structure",
+      injectionKey = "AUTO_UPDATE_TABLE_STRUCTURE",
+      injectionKeyDescription = 
"TableOutputMeta.Injection.AutoUpdateTableStructure.Field")
+  private boolean autoUpdateTableStructure;
+
+  /** Always drop and recreate table when auto-updating structure */
+  @HopMetadataProperty(
+      key = "always_drop_and_recreate",
+      injectionKey = "ALWAYS_DROP_AND_RECREATE",
+      injectionKeyDescription = 
"TableOutputMeta.Injection.AlwaysDropAndRecreate.Field")
+  private boolean alwaysDropAndRecreate;
+
+  /** Add columns from incoming stream that don't exist in target table */
+  @HopMetadataProperty(
+      key = "add_columns",
+      injectionKey = "ADD_COLUMNS",
+      injectionKeyDescription = "TableOutputMeta.Injection.AddColumns.Field")
+  private boolean addColumns;
+
+  /** Drop columns from table that don't exist in incoming stream */
+  @HopMetadataProperty(
+      key = "drop_columns",
+      injectionKey = "DROP_COLUMNS",
+      injectionKeyDescription = "TableOutputMeta.Injection.DropColumns.Field")
+  private boolean dropColumns;
+
+  /** Change column data types to match incoming stream types */
+  @HopMetadataProperty(
+      key = "change_column_types",
+      injectionKey = "CHANGE_COLUMN_TYPES",
+      injectionKeyDescription = 
"TableOutputMeta.Injection.ChangeColumnTypes.Field")
+  private boolean changeColumnTypes;
+
   @HopMetadataProperty(
       groupKey = "fields",
       key = "field",
diff --git 
a/plugins/transforms/tableoutput/src/main/resources/org/apache/hop/pipeline/transforms/tableoutput/messages/messages_en_US.properties
 
b/plugins/transforms/tableoutput/src/main/resources/org/apache/hop/pipeline/transforms/tableoutput/messages/messages_en_US.properties
index aeb97acb36..439c7b72c9 100644
--- 
a/plugins/transforms/tableoutput/src/main/resources/org/apache/hop/pipeline/transforms/tableoutput/messages/messages_en_US.properties
+++ 
b/plugins/transforms/tableoutput/src/main/resources/org/apache/hop/pipeline/transforms/tableoutput/messages/messages_en_US.properties
@@ -73,6 +73,13 @@ TableOutputDialog.ReturnField.Label=Name of auto-generated 
key field
 TableOutputDialog.ReturnKeys.Label=Return auto-generated key
 TableOutputDialog.ReturnKeys.Tooltip=Check this option to return the 
auto-generated key.
 TableOutputDialog.SpecifyFields.Label=Specify database fields
+TableOutputDialog.AutoUpdateTableStructure.Label=Automatically update table 
structure
+TableOutputDialog.AlwaysDropAndRecreate.Label=Always drop and recreate table
+TableOutputDialog.AddColumns.Label=Add columns
+TableOutputDialog.DropColumns.Label=Drop non-existing columns
+TableOutputDialog.ChangeColumnTypes.Label=Change column data types
+TableOutputDialog.DdlOptions.Open.Label=DDL Options (click to open)
+TableOutputDialog.DdlOptions.Close.Label=DDL Options (click to close)
 TableOutputDialog.TargetSchema.Label=Target schema
 TableOutputDialog.TargetTable.Label=Target table
 TableOutputDialog.TruncateTable.Label=Truncate table
@@ -126,4 +133,9 @@ TableOutputMeta.Injection.TableNameInField.Field=Table''s 
name defined in a fiel
 TableOutputMeta.Injection.TableNameInTable.Field=Store the table''s name 
field? (Y/N)
 TableOutputMeta.Injection.TruncateTable.Field=Truncate table? (Y/N)
 TableOutputMeta.Injection.UseBatch.Field=Use batch update for inserts? (Y/N)
+TableOutputMeta.Injection.AutoUpdateTableStructure.Field=Automatically update 
table structure? (Y/N)
+TableOutputMeta.Injection.AlwaysDropAndRecreate.Field=Always drop and recreate 
table? (Y/N)
+TableOutputMeta.Injection.AddColumns.Field=Add columns? (Y/N)
+TableOutputMeta.Injection.DropColumns.Field=Drop non-existing columns? (Y/N)
+TableOutputMeta.Injection.ChangeColumnTypes.Field=Change column data types? 
(Y/N)
 TableOutputMeta.keyword=table,output
diff --git 
a/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMetaTest.java
 
b/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMetaTest.java
index 8f3d4f3c6d..a7d5dfd30b 100644
--- 
a/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMetaTest.java
+++ 
b/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputMetaTest.java
@@ -79,6 +79,13 @@ class TableOutputMetaTest {
     assertTrue(tableOutputMeta.isTableNameInTable());
     assertEquals("", tableOutputMeta.getTableNameField());
     assertFalse(tableOutputMeta.isSpecifyFields());
+
+    // New DDL options should default to false
+    assertFalse(tableOutputMeta.isAutoUpdateTableStructure());
+    assertFalse(tableOutputMeta.isAlwaysDropAndRecreate());
+    assertFalse(tableOutputMeta.isAddColumns());
+    assertFalse(tableOutputMeta.isDropColumns());
+    assertFalse(tableOutputMeta.isChangeColumnTypes());
   }
 
   @Test
@@ -118,6 +125,86 @@ class TableOutputMetaTest {
     tester.testSerialization();
   }
 
+  @Test
+  void testAutoUpdateTableStructureProperty() {
+    TableOutputMeta meta = new TableOutputMeta();
+
+    // Default value
+    assertFalse(meta.isAutoUpdateTableStructure());
+
+    // Set to true
+    meta.setAutoUpdateTableStructure(true);
+    assertTrue(meta.isAutoUpdateTableStructure());
+
+    // Set to false
+    meta.setAutoUpdateTableStructure(false);
+    assertFalse(meta.isAutoUpdateTableStructure());
+  }
+
+  @Test
+  void testAlwaysDropAndRecreateProperty() {
+    TableOutputMeta meta = new TableOutputMeta();
+
+    // Default value
+    assertFalse(meta.isAlwaysDropAndRecreate());
+
+    // Set to true
+    meta.setAlwaysDropAndRecreate(true);
+    assertTrue(meta.isAlwaysDropAndRecreate());
+
+    // Set to false
+    meta.setAlwaysDropAndRecreate(false);
+    assertFalse(meta.isAlwaysDropAndRecreate());
+  }
+
+  @Test
+  void testAddColumnsProperty() {
+    TableOutputMeta meta = new TableOutputMeta();
+
+    // Default value
+    assertFalse(meta.isAddColumns());
+
+    // Set to true
+    meta.setAddColumns(true);
+    assertTrue(meta.isAddColumns());
+
+    // Set to false
+    meta.setAddColumns(false);
+    assertFalse(meta.isAddColumns());
+  }
+
+  @Test
+  void testDropColumnsProperty() {
+    TableOutputMeta meta = new TableOutputMeta();
+
+    // Default value
+    assertFalse(meta.isDropColumns());
+
+    // Set to true
+    meta.setDropColumns(true);
+    assertTrue(meta.isDropColumns());
+
+    // Set to false
+    meta.setDropColumns(false);
+    assertFalse(meta.isDropColumns());
+  }
+
+  @Test
+  void testChangeColumnTypesProperty() {
+    TableOutputMeta meta = new TableOutputMeta();
+
+    // Default value
+    assertFalse(meta.isChangeColumnTypes());
+
+    // Set to true
+    meta.setChangeColumnTypes(true);
+    assertTrue(meta.isChangeColumnTypes());
+
+    // Set to false
+    meta.setChangeColumnTypes(false);
+    assertFalse(meta.isChangeColumnTypes());
+  }
+
   public class TableOutputFieldInputFieldLoadSaveValidator
       implements IFieldLoadSaveValidator<TableOutputField> {
     final Random rand = new Random();
diff --git 
a/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputTest.java
 
b/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputTest.java
index d7e128861f..8141162a58 100644
--- 
a/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputTest.java
+++ 
b/plugins/transforms/tableoutput/src/test/java/org/apache/hop/pipeline/transforms/tableoutput/TableOutputTest.java
@@ -38,6 +38,10 @@ import org.apache.hop.core.database.DatabaseMeta;
 import org.apache.hop.core.database.IDatabase;
 import org.apache.hop.core.exception.HopException;
 import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.value.ValueMetaInteger;
+import org.apache.hop.core.row.value.ValueMetaNumber;
+import org.apache.hop.core.row.value.ValueMetaString;
 import org.apache.hop.core.variables.IVariables;
 import org.apache.hop.pipeline.PipelineMeta;
 import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
@@ -212,4 +216,120 @@ class TableOutputTest {
     verify(tableOutputSpy, times(1))
         .logError("An error occurred initializing this transform: " + 
ke.getMessage());
   }
+
+  // ==================== DDL Feature Tests ====================
+
+  @Test
+  void testUpdateTableStructure_disabled() throws Exception {
+    when(tableOutputMeta.isAutoUpdateTableStructure()).thenReturn(false);
+
+    tableOutputSpy.updateTableStructure();
+
+    // updateTableStructure should return early when disabled
+    // No need to verify anything - just checking it doesn't throw exception
+  }
+
+  @Test
+  void testUpdateTableStructure_partitioningEnabled() throws Exception {
+    when(tableOutputMeta.isAutoUpdateTableStructure()).thenReturn(true);
+    when(tableOutputMeta.isPartitioningEnabled()).thenReturn(true);
+
+    tableOutputSpy.updateTableStructure();
+
+    // updateTableStructure should return early when partitioning is enabled
+  }
+
+  @Test
+  void testUpdateTableStructure_tableNameInField() throws Exception {
+    when(tableOutputMeta.isAutoUpdateTableStructure()).thenReturn(true);
+    when(tableOutputMeta.isPartitioningEnabled()).thenReturn(false);
+    when(tableOutputMeta.isTableNameInField()).thenReturn(true);
+
+    tableOutputSpy.updateTableStructure();
+
+    // updateTableStructure should return early when table name is in field
+  }
+
+  @Test
+  void testUpdateTableStructure_notFirstCopy() throws Exception {
+    when(tableOutputMeta.isAutoUpdateTableStructure()).thenReturn(true);
+    when(tableOutputMeta.isPartitioningEnabled()).thenReturn(false);
+    when(tableOutputMeta.isTableNameInField()).thenReturn(false);
+    when(tableOutputSpy.getCopy()).thenReturn(1);
+    when(tableOutputSpy.getPartitionId()).thenReturn("");
+
+    tableOutputSpy.updateTableStructure();
+
+    // updateTableStructure should return early when not first copy and no 
partition ID
+  }
+
+  @Test
+  void testTypesAreCompatible_sameTypeInteger() {
+    IValueMeta tableField = new ValueMetaInteger("age");
+    IValueMeta streamField = new ValueMetaInteger("age");
+
+    assertTrue(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
+
+  @Test
+  void testTypesAreCompatible_differentTypes() {
+    IValueMeta tableField = new ValueMetaString("age");
+    IValueMeta streamField = new ValueMetaInteger("age");
+
+    assertFalse(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
+
+  @Test
+  void testTypesAreCompatible_stringLengthCompatible() {
+    IValueMeta tableField = new ValueMetaString("name");
+    tableField.setLength(100);
+    IValueMeta streamField = new ValueMetaString("name");
+    streamField.setLength(50);
+
+    assertTrue(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
+
+  @Test
+  void testTypesAreCompatible_stringLengthIncompatible() {
+    IValueMeta tableField = new ValueMetaString("name");
+    tableField.setLength(50);
+    IValueMeta streamField = new ValueMetaString("name");
+    streamField.setLength(100);
+
+    assertFalse(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
+
+  @Test
+  void testTypesAreCompatible_stringUndefinedLengthIncompatible() {
+    IValueMeta tableField = new ValueMetaString("name");
+    tableField.setLength(50);
+    IValueMeta streamField = new ValueMetaString("name");
+    streamField.setLength(-1); // Undefined length
+
+    assertFalse(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
+
+  @Test
+  void testTypesAreCompatible_numberPrecisionIncompatible() {
+    IValueMeta tableField = new ValueMetaNumber("amount");
+    tableField.setLength(10);
+    tableField.setPrecision(2);
+    IValueMeta streamField = new ValueMetaNumber("amount");
+    streamField.setLength(15);
+    streamField.setPrecision(5);
+
+    assertFalse(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
+
+  @Test
+  void testTypesAreCompatible_numberPrecisionCompatible() {
+    IValueMeta tableField = new ValueMetaNumber("amount");
+    tableField.setLength(10);
+    tableField.setPrecision(2);
+    IValueMeta streamField = new ValueMetaNumber("amount");
+    streamField.setLength(10);
+    streamField.setPrecision(2);
+
+    assertTrue(tableOutputSpy.typesAreCompatible(tableField, streamField));
+  }
 }

Reply via email to